You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
165 lines
5.0 KiB
Python
165 lines
5.0 KiB
Python
import datetime as dt
|
|
import re
|
|
|
|
from apistar.exceptions import HTTPException
|
|
|
|
from apistar import http, Route, Include, App
|
|
from apistar_jwt.token import JWT, JWTUser
|
|
from apistar_mail import Message, Mail
|
|
# from sqlalchemy.exc import IntegrityError, InvalidRequestError
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature
|
|
import logbook
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.orm.exc import NoResultFound
|
|
|
|
from cookie_api.models import User
|
|
from cookie_api.schema import UserExportSchema, UserCreateSchema
|
|
from cookie_api.util import ExtJSONResponse
|
|
|
|
# https://realpython.com/handling-email-confirmation-in-flask/
|
|
|
|
EMAIL_REGEX = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)"
|
|
email_regex = re.compile(EMAIL_REGEX)
|
|
|
|
logger = logbook.Logger(__name__)
|
|
|
|
|
|
def generate_confirmation_token(email):
|
|
serializer = URLSafeTimedSerializer('secret-key')
|
|
return serializer.dumps(email, salt='password-salt')
|
|
|
|
|
|
def confirm_token(token, expiration=3600):
|
|
serializer = URLSafeTimedSerializer('secret-key')
|
|
try:
|
|
email = serializer.loads(
|
|
token,
|
|
salt='password-salt',
|
|
max_age=expiration
|
|
)
|
|
except BadSignature:
|
|
return False
|
|
return email
|
|
|
|
|
|
def login(json_data: http.RequestData, session: Session, jwt: JWT):
|
|
user_id = json_data.get('email')
|
|
password = json_data.get('password')
|
|
|
|
try:
|
|
user = session.query(User).filter_by(email=user_id).one()
|
|
|
|
except NoResultFound:
|
|
error = {
|
|
'status': 'fail',
|
|
'message': 'User does not exist'
|
|
}
|
|
return ExtJSONResponse(error, status=400, headers={'WWW-Authenticate': 'Bearer'})
|
|
|
|
if not user.check_password(password):
|
|
error = {'error': 'Password auth failed'},
|
|
return ExtJSONResponse(error, status=401, headers={'WWW-Authenticate': 'Bearer'})
|
|
|
|
payload = {
|
|
'exp': dt.datetime.utcnow() + dt.timedelta(days=0, minutes=60), # Expiration date of the token
|
|
'iat': dt.datetime.utcnow(), # the time the token was generated
|
|
'sub': user.id # the subject of the token
|
|
}
|
|
|
|
token = jwt.encode(payload)
|
|
|
|
data = {
|
|
'status': 'success',
|
|
'message': 'Successfully logged in.',
|
|
'auth_token': token
|
|
}
|
|
|
|
return ExtJSONResponse(data, 200)
|
|
|
|
|
|
# TODO Add user logout
|
|
def logout():
|
|
pass
|
|
|
|
|
|
def register(user_data: UserCreateSchema, session: Session, mail: Mail, app: App) -> UserExportSchema:
|
|
if not email_regex.match(user_data['email']):
|
|
raise HTTPException('Please provide a valid email address', status_code=400)
|
|
|
|
email_check = session.query(User).filter_by(email=user_data['email']).one_or_none()
|
|
|
|
if email_check is not None:
|
|
raise HTTPException('user email address is already in use', status_code=400)
|
|
|
|
user = User(email=user_data['email'], password=user_data['password'])
|
|
|
|
try:
|
|
tokenized_email = generate_confirmation_token(user.email)
|
|
|
|
email_body = app.render_template('confirm_email.jinja2',
|
|
confirm_url=f'http://localhost:5000/auth/confirm?token={tokenized_email}',
|
|
logo_url='http://localhost:5000/static/cookie_icon.png'
|
|
)
|
|
|
|
msg = Message(subject="Confirm your email",
|
|
html=email_body,
|
|
recipients=[user_data['email']])
|
|
mail.send(msg)
|
|
except Exception as e:
|
|
logger.error(e, exc_info=True)
|
|
raise HTTPException('Error an sending email', status_code=500)
|
|
|
|
session.add(user)
|
|
session.commit()
|
|
|
|
headers = {}
|
|
message = {
|
|
'message': 'Please check your inbox and confirm your email',
|
|
'data': UserExportSchema(user)
|
|
}
|
|
return ExtJSONResponse(message, 201, headers)
|
|
|
|
|
|
# TODO Return an HTML response since the user will be clicking on a link in a web browser
|
|
# TODO Since you are returning a user respresentation add a location and Content-Location header
|
|
def confirm(params: http.QueryParams, session: Session):
|
|
token = params['token']
|
|
email = confirm_token(token)
|
|
|
|
if not email:
|
|
raise HTTPException('Email token is expired or invalid', status_code=400)
|
|
|
|
user = session.query(User).filter_by(email=email).one()
|
|
|
|
if user.confirmed:
|
|
raise HTTPException('Account already confirmed. Please login.', status_code=400)
|
|
|
|
user.confirmed = True
|
|
session.add(user)
|
|
session.commit()
|
|
return ExtJSONResponse(UserExportSchema(user))
|
|
|
|
|
|
def user_profile(user: JWTUser, session: Session) -> UserExportSchema:
|
|
try:
|
|
user = session.query(User).filter_by(id=user.id).one()
|
|
except NoResultFound as e:
|
|
error = {'message': str(e)}
|
|
raise HTTPException(error, status_code=400)
|
|
return ExtJSONResponse(UserExportSchema(user))
|
|
|
|
|
|
# TODO Add email password reset
|
|
def reset():
|
|
pass
|
|
|
|
|
|
routes = [
|
|
Route('/login', 'POST', login),
|
|
Route('/register', 'POST', register),
|
|
Route('/status', 'GET', user_profile),
|
|
Route('/confirm', 'GET', confirm)
|
|
]
|
|
|
|
auth_routes = [Include('/auth', name='dirp', routes=routes)]
|