You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

86 lines
2.3 KiB
Python

import typing
from apistar import Route, http, App
from apistar_jwt import JWT
import logbook
from sqlalchemy import create_engine
from cookie_api.logger import global_init
from cookie_api.models import Cookie
from cookie_api.schema import CookieSchema
from cookie_api.util import SQLAlchemyHook, SQLAlchemySession, Session, ExtJSONResponse
engine = create_engine('postgresql://apistar@localhost:5432/apistar')
logger = logbook.Logger('Cookies')
def get_cookies(session: Session) -> typing.List[CookieSchema]:
cookies = session.query(Cookie).all()
return ExtJSONResponse([CookieSchema(cookie) for cookie in cookies], 200)
def get_cookie(session: Session, id) -> CookieSchema:
cookie = session.query(Cookie).filter_by(id=id).one_or_none()
if cookie is None:
msg = {"error": "404 Not Found"}
return ExtJSONResponse(msg, 404)
return ExtJSONResponse(CookieSchema(cookie), 200)
def create_cookie(session: Session, cookie_data: CookieSchema, app: App):
cookie = Cookie(**cookie_data)
session.add(cookie)
session.commit()
headers = {'Location': app.reverse_url('get_cookie', id=cookie.id)}
return ExtJSONResponse(CookieSchema(cookie), 201, headers=headers)
def delete_cookie(session: Session, id: int):
cookie = session.query(Cookie).filter_by(id=id).one_or_none()
if cookie is None:
msg = {"error": "404 Not Found"}
return http.Response(msg, status=404)
logger.debug("Deleting cookie {} {}".format(cookie.id, cookie.name))
session.delete(cookie)
session.commit()
return ExtJSONResponse({}, 204)
_routes = [
Route('/cookies', 'GET', get_cookies),
Route('/cookies', 'POST', create_cookie),
Route('/cookies/{id}', 'GET', get_cookie),
Route('/cookies/{id}', 'DELETE', delete_cookie)
]
app_settings = {
"LOGGING": {
"LEVEL": "DEBUG"
}
}
_routes = _routes # + auth_routes
_hooks = [SQLAlchemyHook]
_components = [
SQLAlchemySession(engine=engine),
JWT({
'JWT_SECRET': 'thisisasecret',
}),
]
def application_factory(settings={}, routes=_routes, components=_components, hooks=_hooks):
"""Returns an instance of Cookie API"""
_settings = {**app_settings, **settings}
global_init(_settings)
return App(components=components,
event_hooks=hooks,
routes=routes)