""" Many resources on the web recommend using celery.task. This might cause circular imports since you'll have to import the Celery instance. We used shared_task to make our code reusable, which, again, requires current_app in create_celery instead of creating a new Celery instance. Now, we can copy this file anywhere in the app and it will work as expected. In short shared_task lets you define Celery tasks without having to import the Celery instance, so it can make your task code more reusable. """ import random import requests from asgiref.sync import async_to_sync from celery import shared_task from celery.signals import task_postrun from celery.utils.log import get_task_logger # See https://docs.celeryq.dev/en/stable/userguide/tasks.html#logging logger = get_task_logger(__name__) @shared_task def divide(x, y): import time time.sleep(5) return x / y @shared_task def add(x, y): import time time.sleep(5) return x + y @shared_task def sample_task(email): """A sample task simulating calling an external API.""" from project.users.views import api_call api_call(email) # Since we set bind to True, this is a bound task, so the first argument to the task # will always be the current task instance (self). Because of this, we can call self.retry # to retry the failed task. @shared_task(bind=True) def task_process_notification(self): try: if not random.choice([0, 1]): # mimic random error raise Exception() # this would block the I/O requests.post("https://httpbin.org/delay/5", timeout=30) except Exception as e: logger.error("exception raised, it would be retry after 5 seconds") # Remember to raise the exception returned by the self.retry method to make it work. # By setting the countdown argument to 5, the task will retry after a 5 second delay. raise self.retry(exc=e, countdown=5) @task_postrun.connect def task_postrun_handler(task_id, **kwargs): # pylint: disable=unused-argument """ Celery signal handler called after each Celery task is executed. Sends a message to the relevant channel via the `update_celery_task_status`. Celery does not support asyncio so we must convert the async function, to a synchronous function with asgiref. """ from project.ws.views import update_celery_task_status async_to_sync(update_celery_task_status)(task_id)