How can I capture a celery.signal for when a task is "queued", must include a way to access kwargs?

15 views Asked by At

system: celery 5.2.7, redis 5.0.3

Trying to find a way to update my database when a task has been "queued" (by queued is when tasks are received and waiting to be started). If I set concurrency to 1, and send 10 tasks, I want to capture the signal when its received and insert into my database that a task was received. Then update it along the way for other signals. I MUST be able to access the task's kwargs (It carries metadata I need to insert into my database).

I reviewed the documentation for celery.signals here https://docs.celeryq.dev/en/v5.2.7/userguide/signals.html

I tried using task_recieved, but I can only access "request" as an input. I tried using before_task_publish, and after_task_publish, but the signal does not seem to be captured when I use it.

My tasks code:

from src.app.tasks_signals import recieved, prerun, success, failure 

@celery.task(name='customers')
def backend(**kwargs):
    outcome = customers_work(**kwargs)
    return outcome

My signal captures:

from celery.signals import task_success, task_failure, task_prerun, before_task_publish

@before_task_publish.connect
def recieved(sender=None, body=None, **kwargs):
    task_id = body['id']
    task_kwargs = kwargs.get('kwargs',{})
    print("RECIEVED KWARGS IN task_recieved")
    print(task_kwargs)
    task_kwargs['correlation_id'] = task_id
    task_kwargs['result'] = "incomplete"
    task_kwargs['status'] = "QUEUED"
    task_kwargs['update_type'] = "create"
    task_kwargs['response_code'] = "200"

    update_job(**task_kwargs)

How can I capture a celery signal when tasks are queued (received?), and also have access to kwargs to run my update_job to update the database?

0

There are 0 answers