Celery - How can I run success and error callbacks within a chord?

1.9k views Asked by At

So I have two tasks that need to be called in parallel, and then if they succeed/fail I need to do something, and I also need to use their result for another task. I'm gonna give whiteboarding it a shot here to try to make it more clear:

      |                   |
      |                   |
     add               subtract
    /   \               /    \
   /     \             /      \
success   \           /       success
or fail    \         /        or fail
 task       \       /            task
             \     /
              \   /
               \ /
           email results

I decided that a chord may be the best option because I can execute add and subtract in parallel, and then take their result and execute another callback.

Here is the code for my tasks:

@shared_task
def add(x):
    print 'adding'
    return x + 5

@shared_task
def subtract(x):
    print 'subtracting'
    return x - 5


@shared_task(ignore_result=True)
def add_subtract_task_success(result):
    if result == 10:
        print 'task_success'

@shared_task
def add_subtract_task_failure(parent_task_id):
    print 'link_error failure'

@shared_task
def email_results(dict):
    print 'This is to be emailed: ' + str(dict)

So playing around in the django shell, I've tried the following:

>>> callback = email_results.s()
>>> header = [add.s(5).apply_async(link=add_subtract_task_success.s(), link_error=add_subtract_task_failure.s()), subtract.s(5).apply_async(link=add_subtract_task_success.s(), link_error=add_subtract_task_failure.s())]
>>> 
>>> result = chord(header)(callback)

And here is the exception it produces:

Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/celery-3.1.17-py2.7.egg/celery/canvas.py", line 636, in __call__
return self.apply_async((), {'body': body} if body else {}, **options)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/celery-3.1.17-py2.7.egg/celery/canvas.py", line 631, in apply_async
parent = _chord(self.tasks, body, args, **options)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/celery-3.1.17-py2.7.egg/celery/app/task.py", line 420, in __call__
return self.run(*args, **kwargs)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/celery-3.1.17-py2.7.egg/celery/app/builtins.py", line 329, in run
maybe_signature(s, app=app).clone() for s in tasks
AttributeError: 'AsyncResult' object has no attribute 'clone'

Any ideas and suggestions are welcome, thank you very much.

0

There are 0 answers