I have a bash script I want to run simultaniously in 2 different dirs. I managed to run it as a subclass of luigi.Task with workers>1 (thus working simultaniously).

What I didn't manage to do was to redirect output. I tried using the ExternalProgramTask superclass, and played a bit with output / redirect_output but I didn't get aywhere I wanted.

This is my code:

import luigi
import os
import time

from luigi.contrib.external_program import ExternalProgramTask

class Installer(object):
    def __init__(self, repo):
        self.repo = repo

    def run(self):
        path = path=os.path.join(os.path.abspath('.'), self.repo, 'install_dir')
        install_cmd = '{path}/{cmd}'.format(path=path, cmd=cmd)

        start_time = time.asctime()
        print 'running {cmd} on {path} at {time}'.format(cmd=cmd,


        print 'finished {cmd} on {path} at {time} (started on {start_time})'.format(

# class Ex(luigi.Task):
class Ex(ExternalProgramTask):
    repo = luigi.Parameter(default='repo0')
    capture_output = True
    output_file = 'Ex_{}'.format(repo)

    def priority(self):
        dummy_value = '17'
        return dummy_value

    def run(self):
        with self.output().open('w') as f:

    def output(self):
        return luigi.LocalTarget(self.output_file)

    def program_args():
        """Must be overriden in an ExternalProgramTask subclass"""

def started_task(task):
    print 'started {}'.format(task.__dict__)

def started_task(task):
    print '{} failed'.format(task.__dict__)

def started_task(task):
    print '{} ended successfully'.format(task.__dict__)

if __name__ == '__main__':
    tasks = [Ex(), Ex(repo='repo1')]
    luigi.build(tasks, workers=10, local_scheduler=False)

What I'm trying to do is running the tasks I currently run, but in a controlled enviornment, i.e. I want to have the ability to control output entirely.

Changing the task to Ex(capture_output=True / False) Doesn't seem to have any effect.

Sub questions:

0) Do I have to use the ExternalProgramTask, or can this be done using luigi.Task?
1) How can I properly transfer the output_redirect variable, given I'm running from python (i.e. python my_luigi.py instead of luigi --module ...)
2) I'm using the central scheduler. Should that affect anything?
3) How can I see the output of the task in the visulaizer (localhost:8082)?

0 Answers