How to channel output of one process as input to the next process in Nextflow?

88 views Asked by At

Can any Nextflow user please demonstrate how to channel output from one process as input to the next process?

My script is

//process 6
process blastn {


    input:
    tuple val(query_id), path(query)
    path db

    output:
    tuple val(query_id), path ("${query.baseName}_blast_sort.tsv"), emit: blastHits

    script:
    """
    blastn \
        -query ${query} -db "${db}/${db_name}/nt" \
        -outfmt 11 -out ${query.baseName}_blast.asn \
        -evalue ${params.evalue} \
        -num_threads ${task.cpus}

    
    blast_formatter \
        -archive ${query.baseName}_blast.asn \
        -outfmt "6 qaccver saccver pident length evalue bitscore stitle" -out ${query.baseName}_blast_unsort.tsv
                
    """
}

//process 7
process topBlastHits {
    
    input:
    path(x)

    output:
    path ("*.tsv") 

    script:
    """
    cat $x | awk 'FNR>=1 && FNR<=5' > ${x.simpleName}.TopBlastHitsWithHeader.tsv
        
    """
}

Workflow definition

blastn (renameContigFastaHeader.out.contig_seq, db_path)


topBlastHits(blastn.out.blastHits) 
        | collectFile(name: 'TopBlastHitsWithHeader.tsv', keepHeader: true, skip: 1, storeDir: "$PWD/results") 
        | view

The process "topBlastHits" gets terminated with an error "Not a valid path value:'sample1'

Any suggestions will be greatly appreciated. Thanks.

I also tried the following workflow without any success

topBlastHits(blastn.out) 
        | collectFile(name: 'TopBlastHitsWithHeader.tsv', keepHeader: true, skip: 1, storeDir: "$PWD/results") 
        | view



 
1

There are 1 answers

0
dthorbur On

I've noticed a couple things.

  1. I'm a little confused what you're trying to achieve with the collectFile operator. The publishDir directive is how you usually publish results from work directories.

  2. The error you're getting also seems confusing. I would expect an error about input cardinality (i.e., Input tuple does not match input set cardinality declared by process 'topBlastHits'). Your input declaration doesn't match a tuple. Changing it to the following may resolve your issues.

//process 7
process topBlastHits {
    tag "query_id"    

    publishDir(
      path: params.outDir,
      mode: 'copy',
    )

    input:
    tuple val(query_id), path(blast_res)

    output:
    path ("*.tsv")

    script:
    """
    cat $blast_res | awk 'FNR>=1 && FNR<=5' > ${query_id}.TopBlastHitsWithHeader.tsv
    """
}