import typing from celery import current_app as celery_current_app from celery.result import AsyncResult from project.config import settings TaskInfoResponse = typing.Dict[str, str] def create_celery(): """Clerey factory function.""" celery_app = celery_current_app celery_app.config_from_object(settings, namespace="CELERY") return celery_app def get_task_info(task_id: str) -> TaskInfoResponse: """ return task info according to the task_id """ task = AsyncResult(task_id) state = task.state if state == "FAILURE": error = str(task.result) response = { "state": task.state, "error": error, } else: response = { "state": task.state, } return response