You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
3.2 KiB
Python

import datetime as dt
from apistar import Component, Settings, http, Route, Include, annotate
from apistar.interfaces import Auth
from apistar.backends.sqlalchemy_backend import Session
from apistar_jwt.authentication import get_jwt, JWTAuthentication
from apistar_jwt.token import JWT
from apistar_mail import Message, Mail
from sqlalchemy.exc import IntegrityError, InvalidRequestError
from sqlalchemy.orm.exc import NoResultFound
from cookie_api.models import User
from cookie_api.schema import UserSchema, UserCreateSchema
auth_components = [
Component(JWT, init=get_jwt)
]
def login(settings: Settings, json_data: http.RequestData, session: Session):
user_id = json_data.get('email')
password = json_data.get('password')
try:
user = session.query(User).filter_by(email=user_id).one()
except NoResultFound:
error = {
'status': 'fail',
'message': 'User does not exist'
}
return http.Response(error, status=400, headers={'WWW-Authenticate': 'Bearer'})
if not user.check_password(password):
error = {'error': 'Password auth failed'},
return http.Response(error, status=401, headers={'WWW-Authenticate': 'Bearer'})
secret = settings['JWT'].get('SECRET')
payload = {
'exp': dt.datetime.utcnow() + dt.timedelta(days=0, minutes=60), # Expiration date of the token
'iat': dt.datetime.utcnow(), # the time the token was generated
'sub': user.id # the subject of the token
}
token = JWT.encode(payload, secret=secret)
data = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': token
}
return data
# TODO Add user logout
def logout():
pass
# TODO Add user registration
def register(user_rep: UserCreateSchema, session: Session, mail: Mail):
email_check = session.query(User).filter_by(email=user_rep['email']).one_or_none()
if email_check is not None:
message = {
'status': 'error',
'message': 'user email address is already in use'
}
return http.Response(message, status=400)
user = User(email=user_rep['email'], password=user_rep['password'])
session.add(user)
session.commit()
msg = Message("Thank you for registering please confirm your email", recipients=[user_rep['email']])
mail.send(msg)
headers = {}
message = {
'status': 'success',
'message': 'Please check your inbox and confirm your email'
}
return http.Response(message, status=201, headers=headers)
@annotate(authentication=[JWTAuthentication()])
def user_profile(auth: Auth, settings: Settings, session: Session) -> UserSchema:
token = JWT(token=auth.token, settings=settings)
user_id = token.payload.get('sub')
user = session.query(User).filter_by(id=user_id).one()
return UserSchema(user)
# TODO Add email confirmation
def confirm(json_data: http.RequestData, session: Session):
pass
# TODO Add email password reset
def reset():
pass
routes = [
Route('/login', 'POST', login),
Route('/register', 'POST', register),
Route('/status', 'GET', user_profile)
]
auth_routes = [Include('/auth', routes)]