""" Many resources on the web recommend using celery.task. This might cause circular imports since you'll have to import the Celery instance. We used shared_task to make our code reusable, which, again, requires current_app in create_celery instead of creating a new Celery instance. Now, we can copy this file anywhere in the app and it will work as expected. In short shared_task lets you define Celery tasks without having to import the Celery instance, so it can make your task code more reusable. """ import random import requests from celery import shared_task from celery.utils.log import get_task_logger # See https://docs.celeryq.dev/en/stable/userguide/tasks.html#logging logger = get_task_logger(__name__) @shared_task def divide(x, y): import time time.sleep(5) return x / y @shared_task def add(x, y): import time time.sleep(5) return x + y @shared_task def sample_task(email): """A sample task simulating calling an external API.""" from project.users.views import api_call api_call(email) # Since we set bind to True, this is a bound task, so the first argument to the task # will always be the current task instance (self). Because of this, we can call self.retry # to retry the failed task. @shared_task(bind=True) def task_process_notification(self): try: if not random.choice([0, 1]): # mimic random error raise Exception() # this would block the I/O requests.post("https://httpbin.org/delay/5", timeout=30) except Exception as e: logger.error("exception raised, it would be retry after 5 seconds") # Remember to raise the exception returned by the self.retry method to make it work. # By setting the countdown argument to 5, the task will retry after a 5 second delay. raise self.retry(exc=e, countdown=5)