Removed example Todo endpoints and tests that the cookiecutter provided
parent
50014ae1ff
commit
e32d5908a0
@ -1,2 +0,0 @@
|
|||||||
from .manager import TodoManager, TodoManagerComponent
|
|
||||||
from .view import todo_routes
|
|
@ -1,80 +0,0 @@
|
|||||||
from inspect import Parameter
|
|
||||||
from typing import List
|
|
||||||
from molten import BaseApp, HTTPError, HTTP_409, HTTP_404
|
|
||||||
from sqlalchemy.orm import Session
|
|
||||||
|
|
||||||
from pulley.manager import BaseManager
|
|
||||||
from pulley.error import EntityNotFound
|
|
||||||
from .model import Todo, TodoModel
|
|
||||||
|
|
||||||
|
|
||||||
class TodoManager(BaseManager):
|
|
||||||
"""A `TodoManager` is accountable for the CRUD operations associated with a `Todo` instance"""
|
|
||||||
|
|
||||||
def schema_from_model(self, result: TodoModel) -> Todo:
|
|
||||||
_todo = Todo(
|
|
||||||
id=result.id,
|
|
||||||
href=self.app.reverse_uri("get_todo_by_id", todo_id=result.id),
|
|
||||||
createdDate=result.created_date,
|
|
||||||
modifiedDate=result.modified_date,
|
|
||||||
todo=result.todo,
|
|
||||||
complete=result.complete
|
|
||||||
)
|
|
||||||
return _todo
|
|
||||||
|
|
||||||
def model_from_schema(self, todo: Todo) -> TodoModel:
|
|
||||||
_todo_model = TodoModel(
|
|
||||||
todo=todo.todo,
|
|
||||||
complete=todo.complete
|
|
||||||
)
|
|
||||||
return _todo_model
|
|
||||||
|
|
||||||
def get_todos(self) -> List[Todo]:
|
|
||||||
"""Retrieves a list of `Todo` representations"""
|
|
||||||
results = self.session.query(TodoModel).order_by(TodoModel.id).all()
|
|
||||||
todos = [self.schema_from_model(result) for result in results]
|
|
||||||
return todos
|
|
||||||
|
|
||||||
def get_todo_by_id(self, id) -> Todo:
|
|
||||||
"""Retrieves a `Todo` representation by id"""
|
|
||||||
result = self.session.query(TodoModel).filter_by(id=id).one_or_none()
|
|
||||||
if result is None:
|
|
||||||
raise EntityNotFound(f"Todo: {id} does not exist")
|
|
||||||
return self.schema_from_model(result)
|
|
||||||
|
|
||||||
def create_todo(self, todo: Todo) -> Todo:
|
|
||||||
"""Creates a new `Todo` resource and returns its representation"""
|
|
||||||
todo_model = self.model_from_schema(todo)
|
|
||||||
self.session.add(todo_model)
|
|
||||||
self.session.flush()
|
|
||||||
return self.schema_from_model(todo_model)
|
|
||||||
|
|
||||||
def update_todo(self, todo_id: int, todo: Todo) -> Todo:
|
|
||||||
"""Updates an existing `Todo` resource and returns its new representation"""
|
|
||||||
result = self.session.query(TodoModel).filter_by(id=todo_id).one_or_none()
|
|
||||||
if result is None:
|
|
||||||
raise EntityNotFound(f"Todo: {todo_id} does not exist")
|
|
||||||
updates = self.model_from_schema(todo)
|
|
||||||
updates.id = todo_id
|
|
||||||
self.session.merge(updates)
|
|
||||||
self.session.flush()
|
|
||||||
todo = self.schema_from_model(result)
|
|
||||||
return todo
|
|
||||||
|
|
||||||
def delete_todo(self, id):
|
|
||||||
"""Deletes a `Todo` """
|
|
||||||
result = self.session.query(TodoModel).filter_by(id=id).one_or_none()
|
|
||||||
if result is not None:
|
|
||||||
self.session.delete(result)
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
class TodoManagerComponent:
|
|
||||||
is_cacheable = True
|
|
||||||
is_singleton = False
|
|
||||||
|
|
||||||
def can_handle_parameter(self, parameter: Parameter) -> bool:
|
|
||||||
return parameter.annotation is TodoManager
|
|
||||||
|
|
||||||
def resolve(self, session: Session, app: BaseApp) -> TodoManager: # type: ignore
|
|
||||||
return TodoManager(session, app)
|
|
@ -1,21 +0,0 @@
|
|||||||
from typing import Optional
|
|
||||||
from molten import schema, field
|
|
||||||
from sqlalchemy import Column, Text, Boolean
|
|
||||||
from pulley.db import Base, DBMixin
|
|
||||||
from pulley.schema import Link
|
|
||||||
|
|
||||||
|
|
||||||
@schema
|
|
||||||
class Todo:
|
|
||||||
id: int = field(response_only=True)
|
|
||||||
createdDate: str = field(response_only=True)
|
|
||||||
modifiedDate: str = field(response_only=True)
|
|
||||||
todo: str
|
|
||||||
complete: Optional[bool]
|
|
||||||
href: Link = field(response_only=True)
|
|
||||||
|
|
||||||
|
|
||||||
class TodoModel(Base, DBMixin):
|
|
||||||
__tablename__ = "todo"
|
|
||||||
todo = Column(Text)
|
|
||||||
complete = Column(Boolean, default=False)
|
|
@ -1,49 +0,0 @@
|
|||||||
from typing import List
|
|
||||||
from molten import Route, Include, HTTP_201, HTTP_202, HTTPError, HTTP_404
|
|
||||||
|
|
||||||
from pulley.schema import APIResponse
|
|
||||||
from pulley.error import EntityNotFound
|
|
||||||
from .model import Todo
|
|
||||||
from .manager import TodoManager
|
|
||||||
|
|
||||||
|
|
||||||
def list_todos(todo_manager: TodoManager) -> List[Todo]:
|
|
||||||
return todo_manager.get_todos()
|
|
||||||
|
|
||||||
|
|
||||||
def create_todo(todo: Todo, todo_manager: TodoManager) -> Todo:
|
|
||||||
_todo = todo_manager.create_todo(todo)
|
|
||||||
headers = {"Location": _todo.href}
|
|
||||||
return HTTP_201, _todo, headers
|
|
||||||
|
|
||||||
|
|
||||||
def delete_todo(todo_id: int, todo_manager: TodoManager):
|
|
||||||
todo_manager.delete_todo(todo_id)
|
|
||||||
return (
|
|
||||||
HTTP_202,
|
|
||||||
APIResponse(status=202, message=f"Delete request for todo: {todo_id} accepted"),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_todo_by_id(todo_id: int, todo_manager: TodoManager) -> Todo:
|
|
||||||
try:
|
|
||||||
_todo = todo_manager.get_todo_by_id(todo_id)
|
|
||||||
except EntityNotFound as err:
|
|
||||||
raise HTTPError(HTTP_404,
|
|
||||||
APIResponse(status=404,
|
|
||||||
message=err.message)
|
|
||||||
)
|
|
||||||
return _todo
|
|
||||||
|
|
||||||
|
|
||||||
def update_todo(todo_id: int, todo: Todo, todo_manager: TodoManager) -> Todo:
|
|
||||||
return todo_manager.update_todo(todo_id, todo)
|
|
||||||
|
|
||||||
|
|
||||||
todo_routes = Include("/todos", [
|
|
||||||
Route("", list_todos, method="GET"),
|
|
||||||
Route("", create_todo, method="POST"),
|
|
||||||
Route("/{todo_id}", delete_todo, method="DELETE"),
|
|
||||||
Route("/{todo_id}", get_todo_by_id, method="GET"),
|
|
||||||
Route("/{todo_id}", update_todo, method="PATCH")
|
|
||||||
])
|
|
Loading…
Reference in New Issue