You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
5.0 KiB
Python

import datetime as dt
import re
from apistar.exceptions import HTTPException
from apistar import http, Route, Include, App
from apistar_jwt.token import JWT, JWTUser
from apistar_mail import Message, Mail
# from sqlalchemy.exc import IntegrityError, InvalidRequestError
from itsdangerous import URLSafeTimedSerializer, BadSignature
import logbook
from sqlalchemy.orm import Session
from sqlalchemy.orm.exc import NoResultFound
from cookie_api.models import User
from cookie_api.schema import UserExportSchema, UserCreateSchema
from cookie_api.util import ExtJSONResponse
# https://realpython.com/handling-email-confirmation-in-flask/
EMAIL_REGEX = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)"
email_regex = re.compile(EMAIL_REGEX)
logger = logbook.Logger(__name__)
def generate_confirmation_token(email):
serializer = URLSafeTimedSerializer('secret-key')
return serializer.dumps(email, salt='password-salt')
def confirm_token(token, expiration=3600):
serializer = URLSafeTimedSerializer('secret-key')
try:
email = serializer.loads(
token,
salt='password-salt',
max_age=expiration
)
except BadSignature:
return False
return email
def login(json_data: http.RequestData, session: Session, jwt: JWT):
user_id = json_data.get('email')
password = json_data.get('password')
try:
user = session.query(User).filter_by(email=user_id).one()
except NoResultFound:
error = {
'status': 'fail',
'message': 'User does not exist'
}
return ExtJSONResponse(error, status=400, headers={'WWW-Authenticate': 'Bearer'})
if not user.check_password(password):
error = {'error': 'Password auth failed'},
return ExtJSONResponse(error, status=401, headers={'WWW-Authenticate': 'Bearer'})
payload = {
'exp': dt.datetime.utcnow() + dt.timedelta(days=0, minutes=60), # Expiration date of the token
'iat': dt.datetime.utcnow(), # the time the token was generated
'sub': user.id # the subject of the token
}
token = jwt.encode(payload)
data = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': token
}
return ExtJSONResponse(data, 200)
# TODO Add user logout
def logout():
pass
def register(user_data: UserCreateSchema, session: Session, mail: Mail, app: App) -> UserExportSchema:
if not email_regex.match(user_data['email']):
raise HTTPException('Please provide a valid email address', status_code=400)
email_check = session.query(User).filter_by(email=user_data['email']).one_or_none()
if email_check is not None:
raise HTTPException('user email address is already in use', status_code=400)
user = User(email=user_data['email'], password=user_data['password'])
try:
tokenized_email = generate_confirmation_token(user.email)
email_body = app.render_template('confirm_email.jinja2',
confirm_url=f'http://localhost:5000/auth/confirm?token={tokenized_email}',
logo_url='http://localhost:5000/static/cookie_icon.png'
)
msg = Message(subject="Confirm your email",
html=email_body,
recipients=[user_data['email']])
mail.send(msg)
except Exception as e:
logger.error(e, exc_info=True)
raise HTTPException('Error an sending email', status_code=500)
session.add(user)
session.commit()
headers = {}
message = {
'message': 'Please check your inbox and confirm your email',
'data': UserExportSchema(user)
}
return ExtJSONResponse(message, 201, headers)
# TODO Return an HTML response since the user will be clicking on a link in a web browser
# TODO Since you are returning a user respresentation add a location and Content-Location header
def confirm(params: http.QueryParams, session: Session):
token = params['token']
email = confirm_token(token)
if not email:
raise HTTPException('Email token is expired or invalid', status_code=400)
user = session.query(User).filter_by(email=email).one()
if user.confirmed:
raise HTTPException('Account already confirmed. Please login.', status_code=400)
user.confirmed = True
session.add(user)
session.commit()
return ExtJSONResponse(UserExportSchema(user))
def user_profile(user: JWTUser, session: Session) -> UserExportSchema:
try:
user = session.query(User).filter_by(id=user.id).one()
except NoResultFound as e:
error = {'message': str(e)}
raise HTTPException(error, status_code=400)
return ExtJSONResponse(UserExportSchema(user))
# TODO Add email password reset
def reset():
pass
routes = [
Route('/login', 'POST', login),
Route('/register', 'POST', register),
Route('/status', 'GET', user_profile),
Route('/confirm', 'GET', confirm)
]
auth_routes = [Include('/auth', name='dirp', routes=routes)]