Skip to content

Commit

Permalink
Merge pull request #136 from reichan1998/map_reduce_pacbio_filter
Browse files Browse the repository at this point in the history
Integrate filtering steps into map-reduce
  • Loading branch information
tkchafin authored Oct 24, 2024
2 parents 90910b7 + 5672d70 commit 3099192
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 88 deletions.
18 changes: 17 additions & 1 deletion conf/modules.config
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,26 @@ process {
withName: SAMTOOLS_COLLATETOFASTA {
beforeScript = { "export REF_PATH=spoof"}
ext.args = { (params.use_work_dir_as_temp ? "-T." : "") }
ext.prefix = { "${meta.chunk_id}" }
}

withName: SAMTOOLS_FILTERTOFASTQ {
ext.prefix = { "${meta.chunk_id}" }
}

withName: BLAST_BLASTN {
ext.args = '-task blastn -reward 1 -penalty -5 -gapopen 3 -gapextend 3 -dust yes -soft_masking true -evalue .01 -searchsp 1750000000000 -outfmt 6'
ext.prefix = { "${meta.chunk_id}" }
}

withName: PACBIO_FILTER {
ext.prefix = { "${meta.chunk_id}" }
}

withName: SAMTOOLS_CONVERT {
beforeScript = { "export REF_PATH=spoof"}
ext.args = "-be '[rq]>=0.99' -x fi -x fp -x ri -x rp --write-index"
ext.args = "--output-fmt bam --write-index"
ext.prefix = { "${meta.chunk_id}" }
}

withName: CONVERT_CRAM {
Expand Down Expand Up @@ -100,6 +111,11 @@ process {
ext.args4 = { "--write-index -l1" }
}

withName: '.*:.*:ALIGN_HIFI:MINIMAP2_ALIGN' {
ext.args = { "-ax map-hifi --cs=short -R ${meta.read_group} -I" + Math.ceil(meta2.genome_size/1e9) + 'G' }
ext.prefix = { "${meta.chunk_id}" }
}

withName: ".*:ALIGN_CLR:.*:CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT" {
ext.args = ""
ext.args1 = { "-F 0x200 -nt" }
Expand Down
46 changes: 46 additions & 0 deletions modules/local/cram_filter.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
process CRAM_FILTER {
tag "$meta.chunk_id"
label "process_high"

container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'https://depot.galaxyproject.org/singularity/mulled-v2-1a6fe65bd6674daba65066aa796ed8f5e8b4687b:688e175eb0db54de17822ba7810cc9e20fa06dd5-0' :
'biocontainers/mulled-v2-1a6fe65bd6674daba65066aa796ed8f5e8b4687b:688e175eb0db54de17822ba7810cc9e20fa06dd5-0' }"

input:
tuple val(meta), path(cramfile), path(cramindex), val(from), val(to), val(base), val(chunkid), val(rglines), path(reference)

output:
tuple val(meta), path("*.cram"), emit: cram
path "versions.yml" , emit: versions

when:
task.ext.when == null || task.ext.when

script:
def args = task.ext.args ?: ''
def prefix = task.ext.prefix ?: "${meta.id}"
def VERSION = "1.15" // Staden_io versions break the pipeline
"""
cram_filter -n ${from}-${to} ${cramfile} ${prefix}_${base}_${chunkid}.cram
cat <<-END_VERSIONS > versions.yml
"${task.process}":
samtools: \$(echo \$(samtools --version 2>&1) | sed 's/^.*samtools //; s/Using.*\$//' )
staden_io: $VERSION
END_VERSIONS
"""

stub:
def prefix = task.ext.prefix ?: "${meta.id}"
def base = "45022_3#2"
def chunkid = "1"
"""
touch ${prefix}_${base}_${chunkid}_filtered.cram
cat <<-END_VERSIONS > versions.yml
"${task.process}":
samtools: \$(echo \$(samtools --version 2>&1) | sed 's/^.*samtools //; s/Using.*\$//' )
staden_io: $VERSION
END_VERSIONS
"""
}
62 changes: 24 additions & 38 deletions subworkflows/local/align_pacbio.nf
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ include { FILTER_PACBIO } from '../../subworkflows/local/filter_pacbio'
include { SAMTOOLS_ADDREPLACERG } from '../../modules/local/samtools_addreplacerg'
include { SAMTOOLS_INDEX } from '../../modules/nf-core/samtools/index/main'
include { GENERATE_CRAM_CSV } from '../../modules/local/generate_cram_csv'
include { MINIMAP2_MAPREDUCE } from '../../subworkflows/local/minimap2_mapreduce'
include { SAMTOOLS_SORMADUP as CONVERT_CRAM } from '../../modules/local/samtools_sormadup'
include { SAMTOOLS_MERGE } from '../../modules/nf-core/samtools/merge/main'

include { CREATE_CRAM_FILTER_INPUT } from '../../subworkflows/local/create_cram_filter_input'
include { MINIMAP2_ALIGN } from '../../modules/nf-core/minimap2/align/main'

workflow ALIGN_PACBIO {
take:
Expand All @@ -22,54 +22,40 @@ workflow ALIGN_PACBIO {
ch_versions = Channel.empty()
ch_merged_bam = Channel.empty()

// Filter BAM and output as FASTQ
FILTER_PACBIO ( reads, db )
ch_versions = ch_versions.mix ( FILTER_PACBIO.out.versions )

// Convert FASTQ to CRAM
CONVERT_CRAM ( FILTER_PACBIO.out.fastq, fasta )
// Convert input to CRAM
CONVERT_CRAM ( reads, fasta )
ch_versions = ch_versions.mix ( CONVERT_CRAM.out.versions )

SAMTOOLS_ADDREPLACERG ( CONVERT_CRAM.out.bam )
ch_versions = ch_versions.mix ( SAMTOOLS_ADDREPLACERG.out.versions )

SAMTOOLS_ADDREPLACERG.out.cram
| set { ch_reads_cram }

// Index the CRAM file
SAMTOOLS_INDEX ( ch_reads_cram )
SAMTOOLS_INDEX ( SAMTOOLS_ADDREPLACERG.out.cram )
ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions )

ch_reads_cram
SAMTOOLS_ADDREPLACERG.out.cram
| join ( SAMTOOLS_INDEX.out.crai )
| set { ch_reads_cram_crai }

| set { ch_reads_cram }

//
// MODULE: generate a CRAM CSV file containing the required parametres for CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT
//
GENERATE_CRAM_CSV( ch_reads_cram_crai )
GENERATE_CRAM_CSV( ch_reads_cram )
ch_versions = ch_versions.mix( GENERATE_CRAM_CSV.out.versions )

//
// SUBWORKFLOW: mapping pacbio reads using minimap2
//
MINIMAP2_MAPREDUCE (
fasta,
GENERATE_CRAM_CSV.out.csv
)
ch_versions = ch_versions.mix( MINIMAP2_MAPREDUCE.out.versions )
ch_merged_bam = ch_merged_bam.mix(MINIMAP2_MAPREDUCE.out.mergedbam)

ch_merged_bam
| combine( ch_reads_cram_crai )
| map { meta_bam, bam, meta_cram, cram, crai -> [ meta_cram, bam ] }
| set { ch_merged_bam }

// Collect all BAM output by sample name
ch_merged_bam
CREATE_CRAM_FILTER_INPUT ( GENERATE_CRAM_CSV.out.csv, fasta )
ch_versions = ch_versions.mix( CREATE_CRAM_FILTER_INPUT.out.versions )

// Filter BAM and output as FASTQ
FILTER_PACBIO ( CREATE_CRAM_FILTER_INPUT.out.chunked_cram, db )
ch_versions = ch_versions.mix ( FILTER_PACBIO.out.versions )

// Align without map reduce
// Align Fastq to Genome with minimap2. bam_format is set to true, making the output a *sorted* BAM
MINIMAP2_ALIGN ( FILTER_PACBIO.out.fastq, fasta, true, "bai", false, false )
ch_versions = ch_versions.mix ( MINIMAP2_ALIGN.out.versions.first() )

// Collect all alignment output by sample name
MINIMAP2_ALIGN.out.bam
| map { meta, bam -> [['id': meta.id.split('_')[0..-2].join('_'), 'datatype': meta.datatype], meta.read_count, bam] }
| groupTuple( by: [0] )
| groupTuple ( by: [0] )
| map { meta, read_counts, bams -> [meta + [read_count: read_counts.sum()], bams] }
| branch {
meta, bams ->
Expand All @@ -81,7 +67,7 @@ workflow ALIGN_PACBIO {

// Merge, but only if there is more than 1 file
SAMTOOLS_MERGE ( ch_bams.multi_bams, [ [], [] ], [ [], [] ] )
ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions )
ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions.first() )


// Convert merged BAM to CRAM and calculate indices and statistics
Expand Down
42 changes: 42 additions & 0 deletions subworkflows/local/create_cram_filter_input.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
include { CRAM_FILTER } from '../../modules/local/cram_filter'

workflow CREATE_CRAM_FILTER_INPUT {
take:
csv_ch
fasta

main:
ch_versions = Channel.empty()

// Generate input channel for CRAM_FILTER
csv_ch
|splitCsv()
|combine(fasta)
|map { cram_id, cram_info, ref_id, ref_dir ->
tuple([
id: cram_id.id,
chunk_id: cram_id.id + "_" + cram_info[5],
genome_size: ref_id.genome_size,
read_count: cram_id.read_count,
read_group: cram_id.read_group,
datatype: cram_id.datatype,
],
file(cram_info[0]),
cram_info[1],
cram_info[2],
cram_info[3],
cram_info[4],
cram_info[5],
cram_info[6],
ref_dir
)
}
| set { ch_cram_filter_input }

CRAM_FILTER(ch_cram_filter_input)
ch_versions = ch_versions.mix(CRAM_FILTER.out.versions)

emit:
chunked_cram = CRAM_FILTER.out.cram
versions = ch_versions
}
59 changes: 10 additions & 49 deletions subworkflows/local/filter_pacbio.nf
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,40 @@ include { BLAST_BLASTN } from '../../modules/nf-core/blast/
include { PACBIO_FILTER } from '../../modules/local/pacbio_filter'
include { SAMTOOLS_FILTERTOFASTQ } from '../../modules/local/samtools_filtertofastq'
include { SEQKIT_FQ2FA } from '../../modules/nf-core/seqkit/fq2fa'
include { BBMAP_FILTERBYNAME } from '../../modules/nf-core/bbmap/filterbyname'
include { BBMAP_FILTERBYNAME } from '../../modules/nf-core/bbmap/filterbyname'


workflow FILTER_PACBIO {
take:
reads // channel: [ val(meta), /path/to/datafile ]
db // channel: /path/to/vector_db


main:
ch_versions = Channel.empty()


// Check file types and branch
// Convert from PacBio CRAM to Samtools BAM
reads
| branch {
meta, reads ->
fastq : reads.findAll { it.getName().toLowerCase() =~ /.*f.*\.gz/ }
bam : true
}
| set { ch_reads }


// Convert from PacBio BAM to Samtools BAM
ch_reads.bam
| map { meta, bam -> [ meta, bam, [] ] }
| map { meta, cram -> [ meta, cram, [] ] }
| set { ch_pacbio }

SAMTOOLS_CONVERT ( ch_pacbio, [ [], [] ], [] )
ch_versions = ch_versions.mix ( SAMTOOLS_CONVERT.out.versions.first() )

ch_versions = ch_versions.mix ( SAMTOOLS_CONVERT.out.versions )

// Collate BAM file to create interleaved FASTA
SAMTOOLS_COLLATETOFASTA ( SAMTOOLS_CONVERT.out.bam )
ch_versions = ch_versions.mix ( SAMTOOLS_COLLATETOFASTA.out.versions.first() )


// Convert FASTQ to FASTA using SEQKIT_FQ2FA
SEQKIT_FQ2FA ( ch_reads.fastq )
ch_versions = ch_versions.mix ( SEQKIT_FQ2FA.out.versions.first() )
ch_versions = ch_versions.mix ( SAMTOOLS_COLLATETOFASTA.out.versions )


// Combine BAM-derived FASTA with converted FASTQ inputs
// Combine BAM-derived FASTA
SAMTOOLS_COLLATETOFASTA.out.fasta
| concat( SEQKIT_FQ2FA.out.fasta )
| set { ch_fasta }


// Nucleotide BLAST
BLAST_BLASTN ( ch_fasta, db )
ch_versions = ch_versions.mix ( BLAST_BLASTN.out.versions.first() )

ch_versions = ch_versions.mix ( BLAST_BLASTN.out.versions )

// Filter BLAST output
PACBIO_FILTER ( BLAST_BLASTN.out.txt )
ch_versions = ch_versions.mix ( PACBIO_FILTER.out.versions.first() )

ch_versions = ch_versions.mix ( PACBIO_FILTER.out.versions )

// Filter the input BAM and output as interleaved FASTA
SAMTOOLS_CONVERT.out.bam
Expand All @@ -78,28 +55,12 @@ workflow FILTER_PACBIO {
| set { ch_bam_reads }

SAMTOOLS_FILTERTOFASTQ ( ch_bam_reads.bams, ch_bam_reads.lists )
ch_versions = ch_versions.mix ( SAMTOOLS_FILTERTOFASTQ.out.versions.first() )


// Filter inputs provided as FASTQ and output as interleaved FASTQ
ch_reads.fastq
| join(PACBIO_FILTER.out.list)
| multiMap { meta, fastq, list -> \
fastqs: [meta, fastq]
lists: list
}
| set { ch_reads_fastq }

BBMAP_FILTERBYNAME ( ch_reads_fastq.fastqs, ch_reads_fastq.lists , "fastq", true)
ch_versions = ch_versions.mix ( BBMAP_FILTERBYNAME.out.versions.first() )

ch_versions = ch_versions.mix ( SAMTOOLS_FILTERTOFASTQ.out.versions )

// Merge filtered outputs as ch_output_fastq
BBMAP_FILTERBYNAME.out.reads
| concat ( SAMTOOLS_FILTERTOFASTQ.out.fastq )
SAMTOOLS_FILTERTOFASTQ.out.fastq
| set { ch_filtered_fastq }


emit:
fastq = ch_filtered_fastq // channel: [ meta, /path/to/fastq ]
versions = ch_versions // channel: [ versions.yml ]
Expand Down

0 comments on commit 3099192

Please sign in to comment.