import typing from apistar import Route, http from apistar_jwt import JWT, authentication_required, JWTUser import logbook from sqlalchemy import create_engine from sqlalchemy.orm import Session from cookie_api.auth import auth_routes from cookie_api.logger import global_init from cookie_api.models import Cookie from cookie_api.schema import CookieSchema from cookie_api.util import SQLAlchemyHook, SQLAlchemySession, ExtJSONResponse, MetaApp as App engine = create_engine('postgresql://apistar@localhost:5432/apistar') logger = logbook.Logger('Cookies') def get_cookies(session: Session) -> typing.List[CookieSchema]: cookies = session.query(Cookie).all() return [CookieSchema(cookie) for cookie in cookies] def get_cookie(session: Session, id) -> CookieSchema: cookie = session.query(Cookie).filter_by(id=id).one_or_none() if cookie is None: msg = {"error": "404 Not Found"} return ExtJSONResponse(msg, 404) return ExtJSONResponse(CookieSchema(cookie), 200) @authentication_required def create_cookie(session: Session, cookie_data: CookieSchema, app: App, user: JWTUser): cookie = Cookie(**cookie_data) session.add(cookie) session.commit() headers = {'Location': app.reverse_url('get_cookie', id=cookie.id)} return ExtJSONResponse(CookieSchema(cookie), 201, headers=headers) @authentication_required def delete_cookie(session: Session, id: int, user: JWTUser): cookie = session.query(Cookie).filter_by(id=id).one_or_none() if cookie is None: msg = {"error": "404 Not Found"} return http.Response(msg, status=404) logger.debug("Deleting cookie {} {}".format(cookie.id, cookie.name)) session.delete(cookie) session.commit() return ExtJSONResponse({}, 204) _routes = [ Route('/cookies', 'GET', get_cookies), Route('/cookies', 'POST', create_cookie), Route('/cookies/{id}', 'GET', get_cookie), Route('/cookies/{id}', 'DELETE', delete_cookie) ] app_settings = { "LOGGING": { "LEVEL": "DEBUG" } } _routes = _routes + auth_routes _hooks = [SQLAlchemyHook] _components = [ SQLAlchemySession(engine=engine), JWT({ 'JWT_USER_ID': 'sub', 'JWT_SECRET': 'thisisasecret', }), ] def application_factory(routes=_routes, components=_components, hooks=_hooks, settings={},): """Returns an instance of Cookie API""" _settings = {**app_settings, **settings} global_init(_settings) return App(components=components, event_hooks=hooks, routes=routes)