I have the following setup
class RootTask(luigi.WrapperTask):
def requires(self):
dependencies = [TaskA(), TaskB()]
yield dependencies
yield CreatePartials(dependencies=dependencies)
class TaskA(luigi.Task):
def run(self):
# write task_a.json
def output(self):
return luigi.LocalTarget('task_a.json')
class TaskB(luigi.Task):
def run(self):
# write task_a.json
def output(self):
return luigi.LocalTarget('task_b.json')
class CreatePartials(luigi.Task):
dependencies = luigi.TaskParameter()
def run(self):
# write root_task_output.json
def requires(self):
for dep in self.dependencies:
yield dep
def output(self):
return luigi.LocalTarget('root_task_output.json')
Although the pipeline is executed successfully, I am getting an Unfulfilled dependency at run time exception for each run on the CreatePartials task. There's one peculiarity with this processing flow: TaskA and TaskB have quite different completion times - meaning that TaskA could finish 8 hours later than TaskB. The fact that the data results are correct makes me confident that the code works - somehow the CreatePartials is retried once, it finds both dependencies completed and proceeds with its run method. Nevertheless, I am getting these Unfulfilled dependencies exceptions which
- Create logs/alerts noise
- I am not sure if they are indicative of a code issue which I am not aware at the moment since the results are correct. And that issue could manifest in some edge I cannot see currently.
So why am I getting these Unfulfilled dependency then?
Thanks a lot.