I have a celery task that processes each line in a super large text file in parallel. I also have a celery task that needs to run after each line is processed - it amalgamates and processes the output of each line. Because these are such huge datasets that I'm working with, is there any way I can have celery work with generators, as opposed to lists?
def main():
header_generator = (processe.s(line) for line in file)
callback = finalize.s()
# Want to loop through header_generator and kick off tasks
chord(header_generator)(callback)
@celery.task
def process(line):
# do stuff with line, return output
return output
@celery.task
def finalize(output_generator):
# Want to loop through output_generator and process the output
for line in output_generator:
# do stuff with output
# do something to signal the completion of the file
If this isn't possible - without forking celery - is there another strategy that someone could recommend?
At the time of this writing, generators passed to groups and chords are immediately expanded. I had a similar problem, so I added support for it and created a pull request against celery 3.x here: https://github.com/celery/celery/pull/3043
Currently only redis is supported. Hopefully the PR will be merged before celery 3 is released.