I apologize in advance for not providing a MVCE -- unfortunately due to the nature of this question, it doesn't lend itself well to minimal examples. I think it will still be quite answerable without an MVCE nonetheless.
I have a list of tasks from which the client may select a subset of tasks to create in Flask. I create the tasks like this:
current_app.logger.info("Creating list of chained tasks..")
chains = [functools.reduce(
lambda x, y: x | y.s(foo, bar), remaining_tasks, first_task.s(foo, bar)
) for foo in foos]
All tasks have a similar function signature, which is something like
@celery.task
def my_task(baz, foo, bar):
# ...
return baz
And I attempt to execute the group in the following way:
current_app.logger.info("Created a group of chained tasks..")
g = group(*chains)
res = g.apply_async(args=(baz,), queue="default")
I find that when apply_async
is called, two exceptions are raised:
Traceback (most recent call last):
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 209, in __getitem__
return self.__consumed[index]
IndexError: list index out of range
and
File "/Users/erip/Code/whatever.py", line 101, in blahblah
res = g.apply_async(args=(baz,), queue="default")
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 977, in apply_async
app = self.app
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/canvas.py", line 1144, in app
app = self.tasks[0].app
File "/Users/erip/.virtualenvs/foo/lib/python3.5/site-packages/celery/utils/functional.py", line 213, in __getitem__
self.__consumed.append(next(self.__it))
TypeError: 'Signature' object is not an iterator
The docs suggest that my construction of the chains is valid, so I don't understand why the asynchronous application is causing problems.
My goal is to create a group of len(foos)
chains which are applied asynchronously. I find that this behavior only happens when len(foos) == 1
.
Has anyone run into this problem before?
I have met similar problem, celery docs has the following note:
Take a look at the constructor of Group class. If we pass only one signature to initialize a group object, this signature will be treated as a generator.
In your case, you can simply execute a group of tasks in the following way: