Why is this construction of a group of chains causing an exception in Celery?

867 views Asked by At

I apologize in advance for not providing a MVCE -- unfortunately due to the nature of this question, it doesn't lend itself well to minimal examples. I think it will still be quite answerable without an MVCE nonetheless.

I have a list of tasks from which the client may select a subset of tasks to create in Flask. I create the tasks like this:

current_app.logger.info("Creating list of chained tasks..")
chains = [functools.reduce(
    lambda x, y: x | y.s(foo, bar), remaining_tasks, first_task.s(foo, bar)
) for foo in foos]

All tasks have a similar function signature, which is something like

@celery.task
def my_task(baz, foo, bar):
    # ...
    return baz

And I attempt to execute the group in the following way:

current_app.logger.info("Created a group of chained tasks..")
g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")

I find that when apply_async is called, two exceptions are raised:

Traceback (most recent call last):
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 209, in __getitem__
    return self.__consumed[index]
IndexError: list index out of range

and

File "/Users/erip/Code/whatever.py", line 101, in blahblah
    res = g.apply_async(args=(baz,), queue="default")
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 977, in apply_async
    app = self.app
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 1144, in app
    app = self.tasks[0].app
  File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 213, in __getitem__
    self.__consumed.append(next(self.__it))
TypeError: 'Signature' object is not an iterator

The docs suggest that my construction of the chains is valid, so I don't understand why the asynchronous application is causing problems.

My goal is to create a group of len(foos) chains which are applied asynchronously. I find that this behavior only happens when len(foos) == 1.

Has anyone run into this problem before?

1

There are 1 answers

1
amyangfei On BEST ANSWER

I have met similar problem, celery docs has the following note:

If only one argument is passed, and that argument is an iterable
then that'll be used as the list of tasks instead: this
allows us to use group with generator expressions.

Take a look at the constructor of Group class. If we pass only one signature to initialize a group object, this signature will be treated as a generator.

def __init__(self, *tasks, **options):                                       
    if len(tasks) == 1:                                                      
        tasks = tasks[0]                                                     
        if isinstance(tasks, group):                                         
            tasks = tasks.tasks                                              
        if not isinstance(tasks, _regen):                                    
            tasks = regen(tasks)                                             
    Signature.__init__(                                                      
        self, 'celery.group', (), {'tasks': tasks}, **options                
    )                                                                        
    self.subtask_type = 'group'  

In your case, you can simply execute a group of tasks in the following way:

current_app.logger.info("Created a group of chained tasks..")
if len(chains) == 1:
    g = group(chains)
else:
    g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")