You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
fastapi_celery/project/users/views.py

93 lines
2.6 KiB
Python

import logging
import random
import requests
from celery.result import AsyncResult
from fastapi import Request
from fastapi.responses import JSONResponse
from fastapi.templating import Jinja2Templates
from . import users_router
from .schema import UserBody
from .tasks import sample_task, task_process_notification
logger = logging.getLogger(__name__)
templates = Jinja2Templates(directory="project/users/templates")
def api_call(email: str): # pylint: disable=unused-argument
"""Used for testing a failed api call"""
if random.choice([0, 1]):
raise Exception("random processing error")
# used for simulating a call to a third-party api
requests.post("https://httpbin.org/delay/5", timeout=30)
@users_router.get("/form/")
def form_example_get(request: Request):
return templates.TemplateResponse("form.html", {"request": request})
@users_router.post("/form/")
def form_example_post(user_body: UserBody):
"""Accepts form data, and relays request to the task queue.
Returns the task id, which can be used with the /task_status/
endpoint to monitor status of the submitted requests.
"""
task = sample_task.delay(user_body.email)
return JSONResponse({"task_id": task.task_id})
@users_router.get("/task_status/")
def task_status(task_id: str):
"""Check async task result.
Endpoint is used for XHR short polling to check the task status.
A result of "PENDING" will be returned for task_id's that don't actually exist.
"""
task = AsyncResult(task_id)
state = task.state
if state == "FAILURE":
error = str(task.result)
response = {
"state": state,
"error": error,
}
else:
response = {
"state": state,
}
return JSONResponse(response)
@users_router.post("/webhook_test/")
def webhook_test():
"""Endpoint simulating the synchronous handling of a webhook request to another service."""
if not random.choice([0, 1]):
# mimic an error
raise Exception()
# blocking process
requests.post("https://httpbin.org/delay/5", timeout=30)
return "pong"
@users_router.post("/webhook_test_async/")
def webhook_test_async():
"""End point simulating async handling of a webhook request to another service.
curl -X POST http://localhost:8010/users/webhook_test_async/ -d {'data':'ping'}
"""
task = task_process_notification.delay()
logger.info(task.id)
return "pong"
@users_router.get("/form_ws/")
def form_ws_example(request: Request):
return templates.TemplateResponse("form_ws.html", {"request": request})