Groups of chains with positional arguments in partial tasks using Celery

1.5k views Asked by At

I am writing an application which will execute a group of several synchronous chains of tasks asynchronously.

In other words, I might have the pipeline foo(a,b,c) -> boo(a,b,c) for some list of bs.

My understanding is to create a chain of foo(a,b,c) | boo(a,b,c) for each b in this list. These chains will then form a celery group, which can be applied asynchronously.

My code to do this is below:

my_app.py

#!/usr/bin/env python3

import functools
import time

from celery import chain, group, Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def foo(a, b, c):
    logger.info("foo from {0}!".format(b))
    return b

@app.task
def boo(a, b, c):
    logger.info("boo from {0}!".format(b))
    return b

def break_up_tasks(tasks):
    try:
        first_task, *remaining_tasks = tasks
    except ValueError as e:
        first_task, remaining_tasks = [], []
    return first_task, remaining_tasks

def do_tasks(a, bs, c, opts):
    tasks = [foo, boo]

    # There should be an option for each task
    if len(opts) != len(tasks):
        raise ValueError("There should be {0} provided options".format(len(tasks)))

    # Create a list of tasks that should be included per the list of options' boolean values
    tasks = [task for opt, task in zip(opts, tasks) if opt]

    first_task, remaining_tasks = break_up_tasks(tasks)

    # If there are no tasks, we're done.
    if not first_task: return

    chains = (
        functools.reduce(
            # `a` should be provided by `apply_async`'s `args` kwarg
            # `b` should be provided by previous partials in chain
            lambda x, y: x | y.s(c),
            remaining_tasks, first_task.s(a, b, c)
        ) for b in bs
    )

    g = group(*chains)
    res = g.apply_async(args=(a,), queue="default")
    print("Applied async... waiting for termination.")

    total_tasks = len(tasks)

    while not res.ready():
        print("Waiting... {0}/{1} tasks complete".format(res.completed_count(), total_tasks))
        time.sleep(1)

if __name__ == "__main__":
    a = "whatever"
    bs = ["hello", "world"]
    c = "baz"

    opts = [
        # do "foo"
        True,
        # do "boo"
        True
    ]

    do_tasks(a, bs, c, opts)

Running celery

celery worker -A my_app -l info -c 5 -Q default

What I'm finding, though, is that when I run the above, my server client runs an infinite loop because boo is missing an argument:

TypeError: boo() missing 1 required positional argument: 'c'

My understanding is that apply_async will provide the args kwarg to each chain and that previous links in the chain will provide their return value to subsequent links.

Why is boo not receiving the arguments properly? I'm sure these tasks aren't well-written as this is my first foray into Celery. If you have other suggestions, I'm happy to entertain them.

1

There are 1 answers

3
olgierdh On BEST ANSWER

After debugging your code ( I'm new to Celery too! :) ) I've learned that each chained function will get the first argument replaced by the result of the previous chained function call - so with that said I believe the solution to your problem is to add one missing argument ( 2nd one ) to the y.s in the reduce:

chains = (
    functools.reduce(
        # `a` should be provided by `apply_async`'s `args` kwarg
        # `b` should be provided by previous partials in chain
        lambda x, y: x | y.s(b,c), # <- here is the 'new guy'
        remaining_tasks, first_task.s(a, b, c)
    ) for b in bs
)

Hope it helps.