|
|
|
"""
|
|
|
|
Many resources on the web recommend using celery.task. This might cause circular imports since you'll have to import the Celery instance.
|
|
|
|
We used shared_task to make our code reusable, which, again, requires current_app in create_celery instead of creating a new Celery instance.
|
|
|
|
Now, we can copy this file anywhere in the app and it will work as expected. In short shared_task lets you define Celery tasks without
|
|
|
|
having to import the Celery instance, so it can make your task code more reusable.
|
|
|
|
"""
|
|
|
|
import random
|
|
|
|
|
|
|
|
import requests
|
|
|
|
from asgiref.sync import async_to_sync
|
|
|
|
from celery import shared_task
|
|
|
|
from celery.signals import task_postrun
|
|
|
|
from celery.utils.log import get_task_logger
|
|
|
|
|
|
|
|
# See https://docs.celeryq.dev/en/stable/userguide/tasks.html#logging
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
@shared_task
|
|
|
|
def divide(x, y):
|
|
|
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
time.sleep(5)
|
|
|
|
return x / y
|
|
|
|
|
|
|
|
|
|
|
|
@shared_task
|
|
|
|
def add(x, y):
|
|
|
|
import time
|
|
|
|
|
|
|
|
time.sleep(5)
|
|
|
|
return x + y
|
|
|
|
|
|
|
|
|
|
|
|
@shared_task
|
|
|
|
def sample_task(email):
|
|
|
|
"""A sample task simulating calling an external API."""
|
|
|
|
from project.users.views import api_call
|
|
|
|
|
|
|
|
api_call(email)
|
|
|
|
|
|
|
|
|
|
|
|
# Since we set bind to True, this is a bound task, so the first argument to the task
|
|
|
|
# will always be the current task instance (self). Because of this, we can call self.retry
|
|
|
|
# to retry the failed task.
|
|
|
|
@shared_task(bind=True)
|
|
|
|
def task_process_notification(self):
|
|
|
|
try:
|
|
|
|
if not random.choice([0, 1]):
|
|
|
|
# mimic random error
|
|
|
|
raise Exception()
|
|
|
|
|
|
|
|
# this would block the I/O
|
|
|
|
requests.post("https://httpbin.org/delay/5", timeout=30)
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
logger.error("exception raised, it would be retry after 5 seconds")
|
|
|
|
# Remember to raise the exception returned by the self.retry method to make it work.
|
|
|
|
# By setting the countdown argument to 5, the task will retry after a 5 second delay.
|
|
|
|
raise self.retry(exc=e, countdown=5)
|
|
|
|
|
|
|
|
|
|
|
|
@task_postrun.connect
|
|
|
|
def task_postrun_handler(task_id, **kwargs): # pylint: disable=unused-argument
|
|
|
|
"""
|
|
|
|
Celery signal handler called after each Celery task is executed.
|
|
|
|
|
|
|
|
Sends a message to the relevant channel via the `update_celery_task_status`.
|
|
|
|
Celery does not support asyncio so we must convert the async function, to a
|
|
|
|
synchronous function with asgiref.
|
|
|
|
"""
|
|
|
|
from project.ws.views import update_celery_task_status
|
|
|
|
|
|
|
|
async_to_sync(update_celery_task_status)(task_id)
|