channel checks as empty even if it has content

1.4k views Asked by At

I am trying to have a process that is launched only if a combination of conditions is met, but when checking if a channel has a path to a file, it always returns it as empty. Probably I am doing something wrong, in that case please correct my code. I tried to follow some of the suggestions in this issue but no success.

Consider the following minimal example:

process one {

  output:
    file("test.txt") into _chProcessTwo

  script:
    """
    echo "Hello world" > "test.txt"
    """

}

// making a copy so I check first if something in the channel or not
// avoids raising exception of MultipleInputChannel
_chProcessTwo.into{
  _chProcessTwoView;
  _chProcessTwoCheck;
  _chProcessTwoUse
}

//print contents of channel
println "Channel contents: " + _chProcessTwoView.toList().view()

process two {

  input:
     file(myInput) from _chProcessTwoUse
  when:
     (!_chProcessTwoCheck.toList().isEmpty())

  script:
    def test = _chProcessTwoUse.toList().isEmpty() ? "I'm empty" : "I'm NOT empty"
    println "The outcome is: " + test

}

I want to have process two run if and only if there is a file in the _chProcessTwo channel. If I run the above code I obtain:

marius@dev:~/pipeline$ ./bin/nextflow run test.nf 
N E X T F L O W  ~  version 19.09.0-edge
Launching `test.nf` [infallible_gutenberg] - revision: 9f57464dc1
[c8/bf38f5] process > one [100%] 1 of 1 ✔
[-        ] process > two -
[/home/marius/pipeline/work/c8/bf38f595d759686a497bb4a49e9778/test.txt]

where the last line are actually the contents of _chProcessTwoView

If I remove the when directive from the second process I get:

marius@mg-dev:~/pipeline$ ./bin/nextflow run test.nf 
N E X T F L O W  ~  version 19.09.0-edge
Launching `test.nf` [modest_descartes] - revision: 5b2bbfea6a
[57/1b7b97] process > one [100%] 1 of 1 ✔
[a9/e4b82d] process > two [100%] 1 of 1 ✔
[/home/marius/pipeline/work/57/1b7b979933ca9e936a3c0bb640c37e/test.txt]

with the contents of the second worker .command.log file being: The outcome is: I'm empty

I tried also without toList()

What am I doing wrong? Thank you in advance

Update: a workaround would be to check _chProcessTwoUse.view() != "" but that is pretty dirty

Update 2 as required by @Steve, I've updated the code to reflect a bit more the actual conditions i have in my own pipeline:

def runProcessOne = true

process one {

  when:
    runProcessOne

  output:
    file("inputProcessTwo.txt") into _chProcessTwo optional true
    file("inputProcessThree.txt") into _chProcessThree optional true

  script:
    // this would replace the probability that output is not created
    def outputSomething = false
    """
    if ${outputSomething}; then
       echo "Hello world" > "inputProcessTwo.txt"
       echo "Goodbye world" > "inputProcessThree.txt"
    else
       echo "Sorry. Process one did not write to file."
    fi
    """

}


// making a copy so I check first if something in the channel or not
// avoids raising exception of MultipleInputChannel
_chProcessTwo.into{
  _chProcessTwoView;
  _chProcessTwoCheck;
  _chProcessTwoUse
}

//print contents of channel
println "Channel contents: " + _chProcessTwoView.view()
println _chProcessTwoView.view() ? "Me empty" : "NOT empty"

process two {

  input:
     file(myInput) from _chProcessTwoUse
  when:
     (runProcessOne) 

  script:
    """
    echo "The outcome is:  ${myInput}"
    """
}


process three {

   input:
       file(defaultInput) from _chUpstreamProcesses
       file(inputFromProcessTwo) from _chProcessThree

   script:
      def extra_parameters = _chProcessThree.isEmpty() ? "" : "--extra-input " + inputFromProcessTwo
      """
        echo "Hooray! We got: ${extra_parameters}"
      """
}

As @Steve mentioned, I should not even check if a channel is empty, NextFlow should know better to not initiate the process. But I think in this construct I will have to.

Marius

1

There are 1 answers

4
Steve On BEST ANSWER

I think part of the problem here is that process 'one' creates only optional outputs. This makes dealing with the optional inputs in process 'three' a bit tricky. I would try to reconcile this if possible. If this can't be reconciled, then you'll need to deal with the optional inputs in process 'three'. To do this, you'll basically need to create a dummy file, pass it into the channel using the ifEmpty operator, then use the name of the dummy file to check whether or not to prepend the argument's prefix. It's a bit of a hack, but it works pretty well.

The first step is to actually create the dummy file. I like shareable pipelines, so I would just create this in your baseDir, perhaps under a folder called 'assets':

mkdir assets
touch assets/NO_FILE

Then pass in your dummy file if your '_chProcessThree' channel is empty:

params.dummy_file = "${baseDir}/assets/NO_FILE"

dummy_file = file(params.dummy_file)


process three {

    input:
    file(defaultInput) from _chUpstreamProcesses
    file(optfile) from _chProcessThree.ifEmpty(dummy_file)

    script:
    def extra_parameters = optfile.name != 'NO_FILE' ? "--extra-input ${optfile}" : ''

    """
    echo "Hooray! We got: ${extra_parameters}"
    """
}

Also, these lines are problematic:

//print contents of channel
println "Channel contents: " + _chProcessTwoView.view()
println _chProcessTwoView.view() ? "Me empty" : "NOT empty"

Calling view() will emit all values from the channel to stdout. You can ignore whatever value it returns. Unless you enable DSL2, the channel will then be empty. I think what you're looking for here is a closure:

_chProcessTwoView.view { "Found: $it" }

Be sure to append -ansi-log false to your nextflow run command so the output doesn't get clobbered. HTH.