Catching a signature of celery task in pytest

24 views Asked by At

I have a workflow that looks like that:

from celery import shared_task, chord, chain, signature

def get_data():
  return list(1,2,3,4)

@shared_task()
def enrich_nums(num: int):
  return {num: num%0}

@shared_task()
def run workflow():
  data = get_data()
  tasks = [enrich_nums(i) for i in data]
  return chord(tasks, signature("task.another.place", queue="another.queue")).delay()

I would like to mock that "signature" to catch the output of it.

Versions: python 3.11.2 celery 5.3.6 pytest-celery

broker: rabbitmq backend: mongodb

I tried to create a task in the test function and assign it, failed.

from tasks import run_workflow

def test_workflow(
    celery_app: Celery,
    celery_worker: WorkController,
):
    @celery_app.task(name="task.another.place", queue="another.queue")
    def mock_task(*args, **kwargs):
        raise ValueError("im here!")
    celery_app.register_task(mock_task)
    celery_worker.reload()
    run_workflow().delay().get()

I was expecting the error to be raised, but a message is inserted into the broker and does not run in the worker.

0

There are 0 answers