Celery chord isn't waiting for child tasks (a group of chains)

3k views Asked by At

My real world situation is that I want to get a list of campaigns from an api call and for each campaign trigger a chain of functions. Once all the chains are complete I need to call a function to report on the results.

I've tried to simplify this down as much as possible and have the following code. This runs, but the chord unlock function is called before the chains are complete. In this code it means that it can't sum the array of results.

import time

from celery import Celery, chain, chord, group

app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')


@app.task
def generate():
    return [1, 2, 3, 4, 5]


@app.task
def dmap(it, first, second):
    chains = []
    for arg in it:
        c = chain(first.clone([arg, ]), second)
        chains.append(c)

    return group(chains)()


@app.task
def add(x, y):
    print 'add {x} {y}'.format(x=x, y=y)
    time.sleep(3)
    return x + y


@app.task
def mul(x, y):
    print 'mul {x} {y}'.format(x=x, y=y)
    time.sleep(2)
    return x * y


@app.task
def xsum(numbers):

    print numbers
    to_sum = []
    for x in numbers[0]:
        to_sum.append(x.result)
    print to_sum

    return sum(to_sum)

if __name__ == '__main__':

    x = add.s(0)
    y = mul.s(1)

    workers = generate.si() | dmap.s(x, y)

    result = chord(workers)(xsum.s())
    print result.get()

The dmap function was based on this answer. I've also seen this answer. The last link implies that what I want to do might not be possible since "there is nothing to synchronize with as the group happens in parallel."

I couldn't work out how to bend the solution to work when the generate function returns an array rather than an single item.

The log from running the above shows the (early?) chord unlock and hence xsum attempting to sum over an array of results where 3 are None.

[2014-11-11 14:03:10,308: INFO/MainProcess] Received task: tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51]
[2014-11-11 14:03:10,311: INFO/MainProcess] Received task: celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] eta:[2014-11-11 14:03:11.307477+00:00]
[2014-11-11 14:03:10,338: INFO/MainProcess] Received task: tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18]
[2014-11-11 14:03:10,365: INFO/MainProcess] Task tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51] succeeded in 0.0523488249746s: [1, 2, 3, 4, 5]
[2014-11-11 14:03:10,386: INFO/MainProcess] Received task: tasks.add[eccf5faa-069c-4634-826e-af5793a11c68]
[2014-11-11 14:03:10,388: WARNING/Worker-2] add 1 0
[2014-11-11 14:03:10,390: INFO/MainProcess] Received task: tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961]
[2014-11-11 14:03:10,392: WARNING/Worker-1] add 2 0
[2014-11-11 14:03:10,394: INFO/MainProcess] Received task: tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52]
[2014-11-11 14:03:10,397: INFO/MainProcess] Received task: tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c]
[2014-11-11 14:03:10,398: INFO/MainProcess] Received task: tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d]
[2014-11-11 14:03:10,399: WARNING/Worker-4] add 3 0
[2014-11-11 14:03:10,401: INFO/MainProcess] Task tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18] succeeded in 0.061700456019s: <GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195,...
[2014-11-11 14:03:10,402: WARNING/Worker-3] add 4 0
[2014-11-11 14:03:13,409: INFO/MainProcess] Received task: tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344]
[2014-11-11 14:03:13,410: INFO/MainProcess] Received task: tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b]
[2014-11-11 14:03:13,418: INFO/MainProcess] Received task: tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d]
[2014-11-11 14:03:13,419: INFO/MainProcess] Received task: tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195]
[2014-11-11 14:03:13,436: INFO/MainProcess] Task tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52] succeeded in 3.03667491797s: 3
[2014-11-11 14:03:13,437: INFO/MainProcess] Task tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c] succeeded in 3.03460178198s: 4
[2014-11-11 14:03:13,438: INFO/MainProcess] Task tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961] succeeded in 3.04608612298s: 2
[2014-11-11 14:03:13,439: WARNING/Worker-4] mul 4 1
[2014-11-11 14:03:13,450: WARNING/Worker-2] add 5 0
[2014-11-11 14:03:13,452: INFO/MainProcess] Task tasks.add[eccf5faa-069c-4634-826e-af5793a11c68] succeeded in 3.06420573901s: 1
[2014-11-11 14:03:13,454: WARNING/Worker-3] mul 3 1
[2014-11-11 14:03:13,481: INFO/MainProcess] Task celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] succeeded in 0.0413383140112s: None
[2014-11-11 14:03:13,485: INFO/MainProcess] Received task: tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373]
[2014-11-11 14:03:15,470: INFO/MainProcess] Task tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344] succeeded in 2.031282346s: 4
[2014-11-11 14:03:15,472: WARNING/Worker-1] mul 1 1
[2014-11-11 14:03:15,477: INFO/MainProcess] Task tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b] succeeded in 2.02354899806s: 3
[2014-11-11 14:03:15,479: WARNING/Worker-4] [<GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195, 4ffb6d04-0cf2-4300-a0de-bf53acf6662d, 538c3c60-67f8-409d-b4ce-bf09184aa03b, f696aa0a-844f-4e81-9722-0693c6e8c344, 82a6b814-53a5-45f1-a0dc-43885f92eca4]>]
[2014-11-11 14:03:15,555: WARNING/Worker-4] [None, None, 3, 4, None]
[2014-11-11 14:03:15,564: ERROR/MainProcess] Task tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373] raised unexpected: TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'",)
Traceback (most recent call last):
  File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/duncan/projects/celerychordtest/tasks.py", line 47, in xsum
    return sum(to_sum)
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
[2014-11-11 14:03:16,460: INFO/MainProcess] Received task: tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4]
[2014-11-11 14:03:16,462: WARNING/Worker-3] mul 5 1
[2014-11-11 14:03:16,476: WARNING/Worker-2] mul 2 1
[2014-11-11 14:03:16,476: INFO/MainProcess] Task tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d] succeeded in 3.02716274199s: 5
[2014-11-11 14:03:17,480: INFO/MainProcess] Task tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195] succeeded in 2.00813938997s: 1
[2014-11-11 14:03:18,485: INFO/MainProcess] Task tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d] succeeded in 2.00837794197s: 2
[2014-11-11 14:03:18,471: INFO/MainProcess] Task tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4] succeeded in 2.009012155s: 5

I was hoping / expecting the process to wait until each chain chains is complete before the chord unlock is called.

2

There are 2 answers

0
Duncan Morris On BEST ANSWER

Like @ChillarAnand suggested I ended up redesigning my tasks, however I did so to eliminate the need for a chord. I wanted the ability to have a group of chains, which meant I couldn't (as far as I could work out) combine this with a chord.

What I now do is to trigger the "final" task as part of triggering the group of chains. In order to make this work the final task has to check that the other tasks have completed. Since I know my last task (in my real world program) writes to a database I can simply check that I have a row in the database for each item that was generated.

For anyone facing a similar problem, the relevant parts of the final function look roughly like the following:

class NotReady(Exception):
    pass

@shared_task(default_retry_delay=30, max_retries=10)
def output(generated_list):

    list_from_db = query db ...
    try:
        raise_if_not_equal(list_from_db, generated_list)
    except NotReady, exc:
        raise current.retry(exc=exc, countdown=30)

    ... everything is ready do stuff ...

FWIW: I'll probably update the retry to backoff basing the code roughly on the following thread

This feels like a good answer, and crucially because this task throws an exception I never have a worker sat polling to find out if everything has completed.

1
Chillar Anand On

A chord is a task that only executes after all of the tasks in a group have finished executing.

If you have simple chord like this

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()

it first executes the group of tasks in header and stores the async objects in a list. Then when it calls the callback, it iterates over the list and get the results of tasks from aync objects.

In your case you are passing workers as a header. workers is a pipeline( or a single big task) and when it is executed it gives only a single async object and NOT a list of objects. So once xsum receives this, it is trying to iterate over the async object dictionary and trying to perform the summation of the different types of objects. So, it is throwing the error.

TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'

So you have to redesign your task so that you feed only a group of tasks as a header to the chord.