Nextflow - Multiqc process per sample instead of processing all the samples output together

145 views Asked by At

My current workflow generate one multiqc report for all the samples in the input list, for some reason I want to generate multiqc report per sample instead. Here's my current workflow..MULTIQC process...Appreciate any help. The input could be either bam or cram format files input.yaml

samples:
-
 biosample_id: NA12878-chr14-AKT1
 aln: NA12878-chr14-AKT1.bam
-
 biosample_id: NA12878-chr14-AKT2
 aln: NA12878-chr14-AKT2.bam

main.nf

Channel
        .empty()
        .mix( mosdepth_bam.out.dists )
        .mix( mosdepth_bam.out.summary )
        .mix( mosdepth_cram.out.dists )
        .mix( mosdepth_cram.out.summary )
        .mix( mosdepth_datamash.out.coverage )
        .mix( verifybamid2_bam.out.freemix )
        .mix( verifybamid2_cram.out.freemix )
        .mix( verifybamid2_bam.out.ancestry )
        .mix( verifybamid2_cram.out.ancestry )
        .mix( samtools_stats_bam.out )
        .mix( samtools_stats_cram.out )
        .map { sample, files -> files }
        .collect()
        .set { log_files }

    multiqc( log_files )

modules/multiqc/main.nf

process multiqc {

input:
path 'data/*'

output:
path "multiqc_report.html", emit: report
path "multiqc_data", emit: data
path "multiqc_data/multiqc_data.json", emit: json_data

"""
multiqc \\
    --data-format json \\
    --enable-npm-plugin \\
    .
"""
}

I tried the below for multiqc process to report per sample...

Try 1. main.nf

Channel
    samples.map { it.biosample_id }
    .set { sample_ids }

multiqc( sample_ids, samtools_stats_bam.out.stats.mix( samtools_stats_cram.out.stats, mosdepth_bam.out.dists, mosdepth_bam.out.summary, mosdepth_cram.out.dists, mosdepth_cram.out.summary, mosdepth_datamash.out.coverage, verifybamid2_bam.out.freemix, verifybamid2_cram.out.freemix, verifybamid2_bam.out.ancestry, verifybamid2_cram.out.ancestry, ).collect() )

modules/multiqc/main.nf

process multiqc {

    tag { sample }

    input:
    tuple val(sample), path('*')
  output:
   
  tuple val(sample), path("${sample}/multiqc_report.html"), emit: report
  tuple val(sample), path("${sample}/multiqc_data"), emit: data
  tuple val(sample), path("${sample}/multiqc_data/multiqc_data.json"), emit: json_data

  """
  multiqc \\
    --data-format json \\
    --enable-npm-plugin \\
    -o ${sample} \\
    .
  """
}

Attached the full workflow generate single multiqc report, just in case main.nf file..

// main

workflow {

ref_fasta = file( params.reference )
ref_fasta_idx = file( params.reference + ".fai" )
autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )
vbi2_ud = file( params.vbi2_ud )
vbi2_bed = file( params.vbi2_bed )
vbi2_mean = file( params.vbi2_mean )

inputs = new YamlSlurper().parse(file(params.inputs_list))

Channel
    .fromList(inputs['samples'])
    .ifEmpty { ['biosample_id': params.biosample_id, 'aln': params.aln] }
    .set { samples }

Channel
    samples.branch { rec ->
        def aln_file = rec.aln ? file( rec.aln ) : null

        bam: rec.biosample_id && aln_file?.extension == 'bam'
            def bam_idx = file( "${rec.aln}.bai" )

            return tuple( rec.biosample_id, aln_file, bam_idx )

        cram: rec.biosample_id && aln_file?.extension == 'cram'
            def cram_idx = file( "${rec.aln}.crai" )

            return tuple( rec.biosample_id, aln_file, cram_idx )
    }
    .set { aln_inputs }


samtools_stats_bam( aln_inputs.bam, [] )
samtools_stats_cram( aln_inputs.cram, ref_fasta )

verifybamid2_bam( aln_inputs.bam, ref_fasta, vbi2_ud, vbi2_bed, vbi2_mean )
verifybamid2_cram( aln_inputs.cram, ref_fasta, vbi2_ud, vbi2_bed, vbi2_mean )

mosdepth_bam( aln_inputs.bam, [] )
mosdepth_cram( aln_inputs.cram, ref_fasta )

Channel
    .empty()
    .mix( mosdepth_bam.out.regions )
    .mix( mosdepth_cram.out.regions )
    .set { mosdepth_regions }

mosdepth_datamash( mosdepth_regions, autosomes_non_gap_regions )
//    mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ) )


Channel
    .empty()
    .mix( mosdepth_bam.out.dists )
    .mix( mosdepth_bam.out.summary )
    .mix( mosdepth_cram.out.dists )
    .mix( mosdepth_cram.out.summary )
    .mix( mosdepth_datamash.out.coverage )
    .mix( verifybamid2_bam.out.freemix )
    .mix( verifybamid2_cram.out.freemix )
    .mix( verifybamid2_bam.out.ancestry )
    .mix( verifybamid2_cram.out.ancestry )
    .mix( samtools_stats_bam.out )
    .mix( samtools_stats_cram.out )
    .map { sample, files -> files }
    .collect()
    .set { log_files }

multiqc( log_files )


Channel
    samples.map { it.biosample_id }
    .set { sample_ids }

compile_metrics ( sample_ids, multiqc.out.json_data )    
}

Attached one of the module for ref modules/mosdepth/main.nf

process mosdepth {

    tag { sample }

    input:
    tuple val(sample), path(bam), path(bai)
    path ref_fasta

    output:
    tuple val(sample), path("*.regions.bed.gz"), emit: regions
    tuple val(sample), path("*.dist.txt"), emit: dists
    tuple val(sample), path("*.summary.txt"), emit: summary

    script:
    def fasta = ref_fasta ? /--fasta "${ref_fasta}"/ : ''

    """
   # run mosdepth    
   # do not output per-base depth and apply 1000bp window-sizes
   # use the reads with mapping quality 20 and above

    mosdepth \\
        --no-per-base \\
        --by 1000 \\
        --mapq 20 \\
        --threads ${task.cpus} \\
        ${fasta} \\
        "${sample}" \\
        "${bam}"
    """
}
1

There are 1 answers

0
dthorbur On

It's a little hard to see what's going on as you're only providing the main.nf and modules/multiqc/main.nf files. And there appear to be input cardinality errors in the scripts, but I got a little lost trying to go over your examples. Regardless, there are a few suggestions I can make.

Importantly, by not joining output streams using the join operator, you're risking incorrect sample IDs being paired with data from different files. I believe nextflow randomly allocates data to processes when more than one item is ready in a given channel.

The easiest solution would be to collect and join all data emitted from mosdepth, verifybamid2, and samtools. I don't use any of these tools, so I'll make up the tuple in the examples.

modepth:

output:
tuple, val("$sampleID"), path("${sampleID}_bam.dists"), path("${sampleID}_bam.summary"), emit: mosdepth_bam_out

samtools:

output:
tuple, val("$sampleID"), path("${sampleID}_bam.stats"), emit: samtools_bam_out

verifybamid2:

output:
tuple, val("$sampleID"), path("${sampleID}_bam.freemix"), path("${sampleID}_bam.ancestry"), emit: verifybamid_bam_out

Workflow main.nf

// Some of the processes just as an example
samtools_stats_bam( aln_inputs.bam, [] )
verifybamid2_bam( aln_inputs.bam, ref_fasta, vbi2_ud, vbi2_bed, vbi2_mean )
mosdepth_bam( aln_inputs.bam, [] )

samtools_stats_bam
  .out
  .samtools_bam_out
  .collect() 
  .join( verifybamid2_bam.out.verifybamid_bam_out.collect(), by: 0 )
  .set { intermediate_ch1 } // I am unsure if you can string multiple joins together. Hence the intermediate channel.

intermediate_ch1
  .join( mosdepth_bam.out.mosdepth_bam_out.collect(), by: 0 )
  .set { multiqc_in }

multiqc( multiqc_in )

You'd also need to update the multiqc input cardinality to reflect your joined input channel:

input:
tuple, val(sampleID), path(samtools_bam_stats), path(verify_bam_freemix), path(verify_bam_ancestry), path(mosdepth_bam_dists), path(mosdepth_bam_summary)

Code not tested, and you'll need to check the order in which join preserves, but it will be consistent, I just can't remember if the initial channel joins to the left or right.

Edit: Also, I have never tried using the collect operator in the same command, so you may need to make more intermediate channels. I also don't know if it's entirely necessary. A final note is you will need to change multiqc's output file name to stop file name collisions in the output directory.