I have a bash script I want to run simultaniously in 2 different dirs. I managed to run it as a subclass of luigi.Task with workers>1 (thus working simultaniously).

What I didn't manage to do was to redirect output. I tried using the ExternalProgramTask superclass, and played a bit with output / redirect_output but I didn't get aywhere I wanted.

This is my code:

#!/bin/pyhton
import luigi
import os
import time

from luigi.contrib.external_program import ExternalProgramTask


class Installer(object):
    def __init__(self, repo):
        self.repo = repo

    def run(self):
        path = path=os.path.join(os.path.abspath('.'), self.repo, 'install_dir')
        cmd='install.sh'
        install_cmd = '{path}/{cmd}'.format(path=path, cmd=cmd)

        start_time = time.asctime()
        print 'running {cmd} on {path} at {time}'.format(cmd=cmd,
                                                         path=path,
                                                         time=start_time)

        os.system(install_cmd)

        print 'finished {cmd} on {path} at {time} (started on {start_time})'.format(
                cmd=cmd,
                path=path,
                time=time.asctime(),
                start_time=start_time)

# class Ex(luigi.Task):
class Ex(ExternalProgramTask):
    repo = luigi.Parameter(default='repo0')
    capture_output = True
    output_file = 'Ex_{}'.format(repo)

    @property
    def priority(self):
        dummy_value = '17'
        return dummy_value

    def run(self):
        with self.output().open('w') as f:
            f.write(Installer(self.repo).run())

    def output(self):
        return luigi.LocalTarget(self.output_file)

    def program_args():
        """Must be overriden in an ExternalProgramTask subclass"""
        pass

@luigi.Task.event_handler(luigi.Event.START)
def started_task(task):
    print 'started {}'.format(task.__dict__)

@luigi.Task.event_handler(luigi.Event.FAILURE)
def started_task(task):
    print '{} failed'.format(task.__dict__)

@luigi.Task.event_handler(luigi.Event.SUCCESS)
def started_task(task):
    print '{} ended successfully'.format(task.__dict__)


if __name__ == '__main__':
    tasks = [Ex(), Ex(repo='repo1')]
    luigi.build(tasks, workers=10, local_scheduler=False)

What I'm trying to do is running the tasks I currently run, but in a controlled enviornment, i.e. I want to have the ability to control output entirely.

Changing the task to Ex(capture_output=True / False) Doesn't seem to have any effect.

Sub questions:

0) Do I have to use the ExternalProgramTask, or can this be done using luigi.Task?
1) How can I properly transfer the output_redirect variable, given I'm running from python (i.e. python my_luigi.py instead of luigi --module ...)
2) I'm using the central scheduler. Should that affect anything?
3) How can I see the output of the task in the visulaizer (localhost:8082)?

0 Answers