requiring set of files to be made before running function in Ruffus pipeline

311 views Asked by At

I'm using ruffus to write a pipeline. I have a function that gets called in parallel many times and it creates several files. I'd like to make a function "combineFiles()" that gets called after all those files have been made. Since they run in parallel on a cluster, they will not all finish together. I wrote a function 'getFilenames()' that returns the set of filenames that need to be created, but how can I make combineFiles() wait for them to be there?

I tried the following:

@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
  # I should only be called if every file in the list 'filenames' exists

I've also tried the decorator:

@merge(getFilenames)

but this does not work either. combineFiles still gets errorneously called before the files given by getFilenames are made. How can I make combineFiles conditional on those files being there?

thanks.

1

There are 1 answers

0
leo goodstadt On

I am the developer of Ruffus. I am not sure I entirely understand what you are trying to do but here goes:

Waiting for jobs which take a different amount of time to finish in order to run the next stage of your pipeline is exactly what Ruffus is about so this hopefully is straightforward.

The first question is do you know which files are being created up front, i.e. before the pipeline is run? Lets start by assuming you do.

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

Let us write a dummy function which creates a file each time it is called. In Ruffus, any input and output file names are contained in the first two parameters respectively. We have no input file name, so our function calls should look like this:

create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")

The definition of create_file would look like this:

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    open(output_file_name, "w").write("dummy file")

Each of these files would be created in 3 separate calls to create_file. These can be run in parallel if you wish.

pipeline_run([create_file], multiprocess = 5)

Now to combine the files. The "@Merge" decorator is indeed set up precisely for this. We just need to link it up to the previous function :

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())

This will only call merge_file when all the files are ready from the three calls to create_file().

The entire code is as follows:

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    # simulate create file process of indeterminate complexity
    sleep(randint(1,5))
    open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

And this is the result:

>>> pipeline_run([merge_file], multiprocess = 5)

    Job = [None -> two.file] completed
    Job = [None -> three.file] completed
    Job = [None -> one.file] completed
Completed Task = create_file
    Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file