import logging import random import requests from celery.result import AsyncResult from fastapi import Request from fastapi.responses import JSONResponse from fastapi.templating import Jinja2Templates from . import users_router from .schema import UserBody from .tasks import sample_task, task_process_notification logger = logging.getLogger(__name__) templates = Jinja2Templates(directory="project/users/templates") def api_call(email: str): # pylint: disable=unused-argument """Used for testing a failed api call""" if random.choice([0, 1]): raise Exception("random processing error") # used for simulating a call to a third-party api requests.post("https://httpbin.org/delay/5", timeout=30) @users_router.get("/form/") def form_example_get(request: Request): return templates.TemplateResponse("form.html", {"request": request}) @users_router.post("/form/") def form_example_post(user_body: UserBody): """Accepts form data, and relays request to the task queue. Returns the task id, which can be used with the /task_status/ endpoint to monitor status of the submitted requests. """ task = sample_task.delay(user_body.email) return JSONResponse({"task_id": task.task_id}) @users_router.get("/task_status/") def task_status(task_id: str): """Check async task result. Endpoint is used for XHR short polling to check the task status. A result of "PENDING" will be returned for task_id's that don't actually exist. """ task = AsyncResult(task_id) state = task.state if state == "FAILURE": error = str(task.result) response = { "state": state, "error": error, } else: response = { "state": state, } return JSONResponse(response) @users_router.post("/webhook_test/") def webhook_test(): """Endpoint simulating the synchronous handling of a webhook request to another service.""" if not random.choice([0, 1]): # mimic an error raise Exception() # blocking process requests.post("https://httpbin.org/delay/5", timeout=30) return "pong" @users_router.post("/webhook_test_async/") def webhook_test_async(): """End point simulating async handling of a webhook request to another service. curl -X POST http://localhost:8010/users/webhook_test_async/ -d {'data':'ping'} """ task = task_process_notification.delay() logger.info(task.id) return "pong" @users_router.get("/form_ws/") def form_ws_example(request: Request): return templates.TemplateResponse("form_ws.html", {"request": request})