What is the order in which Luigi executes the methods (run, output, requires). I understand requires is run as a first check for checking the validity of the task DAG, but shouldn't output be run after run()?
I'm actually trying to wait for a kafka message in run and based on that trigger a bunch of other tasks and return a LocalTarget. Like this:
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
However, I get an error saying:
Exception: path or is_tmp must be set
At the return LocalTarget(self.path) line. Why does luigi try to execute the def output() method till def run() is done?
When you run a pipeline (ie one or more tasks), Luigi first checks whether its output targets already exist, and if not, schedules the task to run.
How does Luigi know what targets it must check? It just gets them calling your task's
output()
method.