Luigi: Succesfull task run with Unfulfilled dependency at run time

214 views Asked by At

I have the following setup

class RootTask(luigi.WrapperTask):

    def requires(self):
        dependencies = [TaskA(), TaskB()]
        yield dependencies
        yield CreatePartials(dependencies=dependencies)


class TaskA(luigi.Task):
    def run(self):
        # write task_a.json

    def output(self):
        return luigi.LocalTarget('task_a.json')


class TaskB(luigi.Task):
    def run(self):
        # write task_a.json

    def output(self):
        return luigi.LocalTarget('task_b.json')


class CreatePartials(luigi.Task):
    dependencies = luigi.TaskParameter()

    def run(self):
        # write root_task_output.json

    def requires(self):
        for dep in self.dependencies:
            yield dep

    def output(self):
        return luigi.LocalTarget('root_task_output.json')

Although the pipeline is executed successfully, I am getting an Unfulfilled dependency at run time exception for each run on the CreatePartials task. There's one peculiarity with this processing flow: TaskA and TaskB have quite different completion times - meaning that TaskA could finish 8 hours later than TaskB. The fact that the data results are correct makes me confident that the code works - somehow the CreatePartials is retried once, it finds both dependencies completed and proceeds with its run method. Nevertheless, I am getting these Unfulfilled dependencies exceptions which

  1. Create logs/alerts noise
  2. I am not sure if they are indicative of a code issue which I am not aware at the moment since the results are correct. And that issue could manifest in some edge I cannot see currently.

So why am I getting these Unfulfilled dependency then?

Thanks a lot.

0

There are 0 answers