You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
2.3 KiB
Python

import datetime as dt
from apistar import Component, Settings, http, Route, Include
from apistar.backends.sqlalchemy_backend import Session
from apistar_jwt.authentication import get_jwt
from apistar_jwt.token import JWT
from sqlalchemy.exc import IntegrityError, InvalidRequestError
from cookie_api.models import User
auth_components = [
Component(JWT, init=get_jwt)
]
def login(settings: Settings, json_data: http.RequestData, session: Session):
user_id = json_data.get('email')
password = json_data.get('password')
user = session.query(User).filter_by(email=user_id).one()
if not user.check_password(password):
error = {'error': 'Password auth failed'},
return http.Response(error, status=401, headers={'WWW-Authenticate': 'Bearer'})
secret = settings['JWT'].get('SECRET')
payload = {
'exp': dt.datetime.utcnow() + dt.timedelta(days=0, minutes=60),
'iat': dt.datetime.utcnow(),
'sub': user.id
}
token = JWT.encode(payload, secret=secret)
data = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': token
}
return data
# TODO Add user logout
def logout():
pass
# TODO Add user registration
def register(settings: Settings, json_data: http.RequestData, session: Session):
user_id = json_data.get('email')
password = json_data.get('password')
email_check = session.query(User).filter_by(email=user_id).one_or_none()
if email_check is not None:
message = {
'status': 'error',
'message': 'user email address is already in use'
}
return http.Response(message, status=400)
user = User(email=user_id, password=password)
session.add(user)
session.commit()
# TODO Send off an email confirmation
headers = {}
message = {
'status': 'success',
'message': 'Please check your inbox and confirm your email'
}
return http.Response(message, status=200, headers=headers)
# TODO Add user profile endpoint
def user_profile():
pass
# TODO Add email confirmation
def confirm(settings: Settings, json_data: http.RequestData, session: Session):
pass
# TODO Add email password reset
def reset():
pass
routes = [
Route('/login', 'POST', login),
Route('/register', 'POST', register)
]
auth_routes = [Include('/auth', routes)]