diff --git a/.pylintrc b/.pylintrc index f921c98..ce7da6c 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,4 @@ [MASTER] ignore-paths=^alembic\\.*$|^alembic/.*$ +; Needed to prevent E0611: No name 'BaseModel' in module 'pydantic' (no-name-in-module) +extension-pkg-whitelist=pydantic diff --git a/project/users/__init__.py b/project/users/__init__.py index c5369c0..8633eca 100644 --- a/project/users/__init__.py +++ b/project/users/__init__.py @@ -4,6 +4,6 @@ users_router = APIRouter( prefix="/users", ) -from . import models, tasks # noqa +from . import models, tasks, views # noqa from project.celery_utils import create_celery diff --git a/project/users/schema.py b/project/users/schema.py new file mode 100644 index 0000000..a1362cc --- /dev/null +++ b/project/users/schema.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel + + +class UserBody(BaseModel): + + username: str + email: str diff --git a/project/users/tasks.py b/project/users/tasks.py index 24b25d7..7d22f28 100644 --- a/project/users/tasks.py +++ b/project/users/tasks.py @@ -1,11 +1,18 @@ """ Many resources on the web recommend using celery.task. This might cause circular imports since you'll have to import the Celery instance. We used shared_task to make our code reusable, which, again, requires current_app in create_celery instead of creating a new Celery instance. -Now, we can copy this file anywhere in the app and it will work as expected. +Now, we can copy this file anywhere in the app and it will work as expected. In short shared_task lets you define Celery tasks without +having to import the Celery instance, so it can make your task code more reusable. """ - +import random +import requests from celery import shared_task +from celery.utils.log import get_task_logger + +# See https://docs.celeryq.dev/en/stable/userguide/tasks.html#logging +logger = get_task_logger(__name__) + @shared_task def divide(x, y): @@ -22,3 +29,31 @@ def add(x, y): time.sleep(5) return x + y + + +@shared_task +def sample_task(email): + """A sample task simulating calling an external API.""" + from project.users.views import api_call + + api_call(email) + + +# Since we set bind to True, this is a bound task, so the first argument to the task +# will always be the current task instance (self). Because of this, we can call self.retry +# to retry the failed task. +@shared_task(bind=True) +def task_process_notification(self): + try: + if not random.choice([0, 1]): + # mimic random error + raise Exception() + + # this would block the I/O + requests.post("https://httpbin.org/delay/5", timeout=30) + except Exception as e: + + logger.error("exception raised, it would be retry after 5 seconds") + # Remember to raise the exception returned by the self.retry method to make it work. + # By setting the countdown argument to 5, the task will retry after a 5 second delay. + raise self.retry(exc=e, countdown=5) diff --git a/project/users/templates/form.html b/project/users/templates/form.html new file mode 100644 index 0000000..2954f1d --- /dev/null +++ b/project/users/templates/form.html @@ -0,0 +1,122 @@ + + + + + + Celery example + + + +
+
+
+
+
+ + +
+
+ + +
+
+ +
+
+
+
+ + + + + diff --git a/project/users/views.py b/project/users/views.py new file mode 100644 index 0000000..bd79013 --- /dev/null +++ b/project/users/views.py @@ -0,0 +1,71 @@ +import logging +import random + +import requests +from celery.result import AsyncResult +from fastapi import Request +from fastapi.responses import JSONResponse +from fastapi.templating import Jinja2Templates + +from . import users_router +from .schema import UserBody +from .tasks import sample_task, task_process_notification + +logger = logging.getLogger(__name__) +templates = Jinja2Templates(directory="project/users/templates") + + +def api_call(email: str): # pylint: disable=unused-argument + # used for testing a failed api call + if random.choice([0, 1]): + raise Exception("random processing error") + + # used for simulating a call to a third-party api + requests.post("https://httpbin.org/delay/5", timeout=30) + + +@users_router.get("/form/") +def form_example_get(request: Request): + return templates.TemplateResponse("form.html", {"request": request}) + + +@users_router.post("/form/") +def form_example_post(user_body: UserBody): + task = sample_task.delay(user_body.email) + return JSONResponse({"task_id": task.task_id}) + + +@users_router.get("/task_status/") +def task_status(task_id: str): + task = AsyncResult(task_id) + state = task.state + + if state == "FAILURE": + error = str(task.result) + response = { + "state": state, + "error": error, + } + else: + response = { + "state": state, + } + return JSONResponse(response) + + +@users_router.post("/webhook_test/") +def webhook_test(): + if not random.choice([0, 1]): + # mimic an error + raise Exception() + + # blocking process + requests.post("https://httpbin.org/delay/5", timeout=30) + return "pong" + + +@users_router.post("/webhook_test_async/") +def webhook_test_async(): + task = task_process_notification.delay() + print(task.id) + return "pong" diff --git a/requirements.in b/requirements.in index 252c5fa..dd60f9e 100644 --- a/requirements.in +++ b/requirements.in @@ -2,9 +2,10 @@ alembic==1.8.1 celery==5.2.7 fastapi==0.79.0 flower==1.2.0 +Jinja2==3.1.2 psycopg2-binary==2.9.3 redis==4.3.4 -shellcheck-py==0.8.0.4 +requests==2.28.1 SQLAlchemy==1.4.40 uvicorn[standard]==0.18.2 watchfiles==0.17.0 diff --git a/requirements.txt b/requirements.txt index c50b60c..4be0a45 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,6 +20,10 @@ celery==5.2.7 # via # -r requirements.in # flower +certifi==2022.9.24 + # via requests +charset-normalizer==2.1.1 + # via requests click==8.1.3 # via # celery @@ -48,17 +52,23 @@ httptools==0.5.0 humanize==4.4.0 # via flower idna==3.4 - # via anyio + # via + # anyio + # requests importlib-metadata==4.12.0 # via alembic importlib-resources==5.9.0 # via alembic +jinja2==3.1.2 + # via -r requirements.in kombu==5.2.4 # via celery mako==1.2.2 # via alembic markupsafe==2.1.1 - # via mako + # via + # jinja2 + # mako packaging==21.3 # via redis prometheus-client==0.14.1 @@ -81,6 +91,8 @@ pyyaml==6.0 # via uvicorn redis==4.3.4 # via -r requirements.in +requests==2.28.1 + # via -r requirements.in six==1.16.0 # via click-repl sniffio==1.3.0 @@ -97,6 +109,8 @@ typing-extensions==4.3.0 # via # pydantic # starlette +urllib3==1.26.12 + # via requests uvicorn[standard]==0.18.2 # via -r requirements.in uvloop==0.17.0