import datetime as dt import re from apistar.exceptions import HTTPException from apistar import http, Route, Include, App from apistar_jwt.token import JWT, JWTUser from apistar_mail import Message, Mail # from sqlalchemy.exc import IntegrityError, InvalidRequestError from itsdangerous import URLSafeTimedSerializer, BadSignature import logbook from sqlalchemy.orm import Session from sqlalchemy.orm.exc import NoResultFound from cookie_api.models import User from cookie_api.schema import UserExportSchema, UserCreateSchema from cookie_api.util import ExtJSONResponse # https://realpython.com/handling-email-confirmation-in-flask/ EMAIL_REGEX = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)" email_regex = re.compile(EMAIL_REGEX) logger = logbook.Logger(__name__) def generate_confirmation_token(email): serializer = URLSafeTimedSerializer('secret-key') return serializer.dumps(email, salt='password-salt') def confirm_token(token, expiration=3600): serializer = URLSafeTimedSerializer('secret-key') try: email = serializer.loads( token, salt='password-salt', max_age=expiration ) except BadSignature: return False return email def login(json_data: http.RequestData, session: Session, jwt: JWT): user_id = json_data.get('email') password = json_data.get('password') try: user = session.query(User).filter_by(email=user_id).one() except NoResultFound: error = { 'status': 'fail', 'message': 'User does not exist' } return ExtJSONResponse(error, status=400, headers={'WWW-Authenticate': 'Bearer'}) if not user.check_password(password): error = {'error': 'Password auth failed'}, return ExtJSONResponse(error, status=401, headers={'WWW-Authenticate': 'Bearer'}) payload = { 'exp': dt.datetime.utcnow() + dt.timedelta(days=0, minutes=60), # Expiration date of the token 'iat': dt.datetime.utcnow(), # the time the token was generated 'sub': user.id # the subject of the token } token = jwt.encode(payload) data = { 'status': 'success', 'message': 'Successfully logged in.', 'auth_token': token } return ExtJSONResponse(data, 200) # TODO Add user logout def logout(): pass def register(user_data: UserCreateSchema, session: Session, mail: Mail, app: App) -> UserExportSchema: if not email_regex.match(user_data['email']): raise HTTPException('Please provide a valid email address', status_code=400) email_check = session.query(User).filter_by(email=user_data['email']).one_or_none() if email_check is not None: raise HTTPException('user email address is already in use', status_code=400) user = User(email=user_data['email'], password=user_data['password']) try: tokenized_email = generate_confirmation_token(user.email) email_body = app.render_template('confirm_email.jinja2', confirm_url=f'http://localhost:5000/auth/confirm?token={tokenized_email}', logo_url='http://localhost:5000/static/cookie_icon.png' ) msg = Message(subject="Confirm your email", html=email_body, recipients=[user_data['email']]) mail.send(msg) except Exception as e: logger.error(e, exc_info=True) raise HTTPException('Error an sending email', status_code=500) session.add(user) session.commit() headers = {} message = { 'message': 'Please check your inbox and confirm your email', 'data': UserExportSchema(user) } return ExtJSONResponse(message, 201, headers) # TODO Return an HTML response since the user will be clicking on a link in a web browser # TODO Since you are returning a user respresentation add a location and Content-Location header def confirm(params: http.QueryParams, session: Session): token = params['token'] email = confirm_token(token) if not email: raise HTTPException('Email token is expired or invalid', status_code=400) user = session.query(User).filter_by(email=email).one() if user.confirmed: raise HTTPException('Account already confirmed. Please login.', status_code=400) user.confirmed = True session.add(user) session.commit() return ExtJSONResponse(UserExportSchema(user)) def user_profile(user: JWTUser, session: Session) -> UserExportSchema: try: user = session.query(User).filter_by(id=user.id).one() except NoResultFound as e: error = {'message': str(e)} raise HTTPException(error, status_code=400) return ExtJSONResponse(UserExportSchema(user)) # TODO Add email password reset def reset(): pass routes = [ Route('/login', 'POST', login), Route('/register', 'POST', register), Route('/status', 'GET', user_profile), Route('/confirm', 'GET', confirm) ] auth_routes = [Include('/auth', name='dirp', routes=routes)]