Luigi task methods execution order

1.3k views Asked by At

What is the order in which Luigi executes the methods (run, output, requires). I understand requires is run as a first check for checking the validity of the task DAG, but shouldn't output be run after run()?

I'm actually trying to wait for a kafka message in run and based on that trigger a bunch of other tasks and return a LocalTarget. Like this:

def run(self):
    for message in self.consumer:
        self.metadata_key = str(message.value, 'utf-8')
        self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
        if not os.path.exists(self.path):
            os.mkdir(self.path)

        with self.conn.cursor() as cursor:
              all_accounts = cursor.execute('select domainname from tblaccountinfo;')
        for each in all_accounts:
            open(os.path.join(self.path,each)).close()

def output(self):
    return LocalTarget(self.path)

However, I get an error saying:

Exception: path or is_tmp must be set

At the return LocalTarget(self.path) line. Why does luigi try to execute the def output() method till def run() is done?

2

There are 2 answers

0
matagus On BEST ANSWER

When you run a pipeline (ie one or more tasks), Luigi first checks whether its output targets already exist, and if not, schedules the task to run.

How does Luigi know what targets it must check? It just gets them calling your task's output() method.

0
Pathanjali Tallapragada On

It is not the execution order. Luigi will check for the file that we want to create using output() method is existing or not before making the task to pending status. So, it expects the variables to be resolved if you are using any. Here, you are using self.path, which is getting created in the run method. That's why the error.

Either you have to create the path in the class itself and consume in output method or create them in the output method itself and consume them in the run method as below

self.output().open('w').close()