Skip to content

Commit

Permalink
add chunking before filtering for PacBio
Browse files Browse the repository at this point in the history
  • Loading branch information
reichan1998 committed Oct 15, 2024
1 parent 815289d commit 83dd8cd
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 66 deletions.
26 changes: 25 additions & 1 deletion conf/modules.config
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,45 @@ process {
withName: SAMTOOLS_COLLATETOFASTA {
beforeScript = { "export REF_PATH=spoof"}
ext.args = { (params.use_work_dir_as_temp ? "-T." : "") }
ext.prefix = { "${meta.chunk_id}" }
}

withName: SAMTOOLS_FILTERTOFASTQ {
ext.prefix = { "${meta.chunk_id}" }
}

withName: BLAST_BLASTN {
ext.args = '-task blastn -reward 1 -penalty -5 -gapopen 3 -gapextend 3 -dust yes -soft_masking true -evalue .01 -searchsp 1750000000000 -outfmt 6'
ext.prefix = { "${meta.chunk_id}" }
}

withName: PACBIO_FILTER {
ext.prefix = { "${meta.chunk_id}" }
}

withName: SAMTOOLS_CONVERT {
beforeScript = { "export REF_PATH=spoof"}
ext.args = "-be '[rq]>=0.99' -x fi -x fp -x ri -x rp --write-index"
ext.args = "--output-fmt bam --write-index"
ext.prefix = { "${meta.chunk_id}" }
}

withName: CONVERT_CRAM {
ext.args = "--output-fmt cram"
}

withName: CONVERT_FQ_CRAM {
ext.args = "--output-fmt cram"
ext.prefix = { "${meta.chunk_id}" }
}

withName: SAMTOOLS_INDEX_FQ {
ext.prefix = { "${meta.chunk_id}" }
}

withName: GENERATE_CRAM_CSV_FQ {
ext.prefix = { "${meta.chunk_id}" }
}

withName: ".*:ALIGN_ILLUMINA:.*:CRAM_FILTER_ALIGN_BWAMEM2_FIXMATE_SORT" {
ext.args = ""
ext.args1 = { "-F 0x200 -nt" }
Expand Down
46 changes: 46 additions & 0 deletions modules/local/cram_filter.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
process CRAM_FILTER {
tag "$meta.chunk_id"
label "process_high"

container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'https://depot.galaxyproject.org/singularity/mulled-v2-1a6fe65bd6674daba65066aa796ed8f5e8b4687b:688e175eb0db54de17822ba7810cc9e20fa06dd5-0' :
'biocontainers/mulled-v2-1a6fe65bd6674daba65066aa796ed8f5e8b4687b:688e175eb0db54de17822ba7810cc9e20fa06dd5-0' }"

input:
tuple val(meta), path(cramfile), path(cramindex), val(from), val(to), val(base), val(chunkid), val(rglines), path(reference)

output:
tuple val(meta), path("*.cram"), emit: cram
path "versions.yml" , emit: versions

when:
task.ext.when == null || task.ext.when

script:
def args = task.ext.args ?: ''
def prefix = task.ext.prefix ?: "${meta.id}"
def VERSION = "1.15" // Staden_io versions break the pipeline
"""
cram_filter -n ${from}-${to} ${cramfile} ${prefix}_${base}_${chunkid}.cram
cat <<-END_VERSIONS > versions.yml
"${task.process}":
samtools: \$(echo \$(samtools --version 2>&1) | sed 's/^.*samtools //; s/Using.*\$//' )
staden_io: $VERSION
END_VERSIONS
"""

stub:
def prefix = task.ext.prefix ?: "${meta.id}"
def base = "45022_3#2"
def chunkid = "1"
"""
touch ${prefix}_${base}_${chunkid}_filtered.cram
cat <<-END_VERSIONS > versions.yml
"${task.process}":
samtools: \$(echo \$(samtools --version 2>&1) | sed 's/^.*samtools //; s/Using.*\$//' )
staden_io: $VERSION
END_VERSIONS
"""
}
47 changes: 32 additions & 15 deletions subworkflows/local/align_pacbio.nf
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
include { FILTER_PACBIO } from '../../subworkflows/local/filter_pacbio'
include { SAMTOOLS_ADDREPLACERG } from '../../modules/local/samtools_addreplacerg'
include { SAMTOOLS_INDEX } from '../../modules/nf-core/samtools/index/main'
include { SAMTOOLS_INDEX as SAMTOOLS_INDEX_FQ } from '../../modules/nf-core/samtools/index/main'
include { GENERATE_CRAM_CSV } from '../../modules/local/generate_cram_csv'
include { GENERATE_CRAM_CSV as GENERATE_CRAM_CSV_FQ } from '../../modules/local/generate_cram_csv'
include { MINIMAP2_MAPREDUCE } from '../../subworkflows/local/minimap2_mapreduce'
include { SAMTOOLS_SORMADUP as CONVERT_CRAM } from '../../modules/local/samtools_sormadup'
include { SAMTOOLS_SORMADUP as CONVERT_FQ_CRAM } from '../../modules/local/samtools_sormadup'
include { SAMTOOLS_MERGE } from '../../modules/nf-core/samtools/merge/main'

include { CREATE_CRAM_FILTER_INPUT } from '../../subworkflows/local/create_cram_filter_input'

workflow ALIGN_PACBIO {
take:
Expand All @@ -22,41 +25,55 @@ workflow ALIGN_PACBIO {
ch_versions = Channel.empty()
ch_merged_bam = Channel.empty()

// Filter BAM and output as FASTQ
FILTER_PACBIO ( reads, db )
ch_versions = ch_versions.mix ( FILTER_PACBIO.out.versions )

// Convert FASTQ to CRAM
CONVERT_CRAM ( FILTER_PACBIO.out.fastq, fasta )
// Convert input to CRAM
CONVERT_CRAM ( reads, fasta )
ch_versions = ch_versions.mix ( CONVERT_CRAM.out.versions )

SAMTOOLS_ADDREPLACERG ( CONVERT_CRAM.out.bam )
ch_versions = ch_versions.mix ( SAMTOOLS_ADDREPLACERG.out.versions )

SAMTOOLS_ADDREPLACERG.out.cram
| set { ch_reads_cram }

// Index the CRAM file
SAMTOOLS_INDEX ( ch_reads_cram )
SAMTOOLS_INDEX ( SAMTOOLS_ADDREPLACERG.out.cram )
ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions )

ch_reads_cram
SAMTOOLS_ADDREPLACERG.out.cram
| join ( SAMTOOLS_INDEX.out.crai )
| set { ch_reads_cram }

GENERATE_CRAM_CSV( ch_reads_cram )
ch_versions = ch_versions.mix( GENERATE_CRAM_CSV.out.versions )

CREATE_CRAM_FILTER_INPUT ( GENERATE_CRAM_CSV.out.csv, fasta )
ch_versions = ch_versions.mix( CREATE_CRAM_FILTER_INPUT.out.versions )

// Filter BAM and output as FASTQ
FILTER_PACBIO ( CREATE_CRAM_FILTER_INPUT.out.chunked_cram, db )
ch_versions = ch_versions.mix ( FILTER_PACBIO.out.versions )

// Convert FASTQ to CRAM
CONVERT_FQ_CRAM ( FILTER_PACBIO.out.fastq, fasta )
ch_versions = ch_versions.mix ( CONVERT_FQ_CRAM.out.versions )

SAMTOOLS_INDEX_FQ ( CONVERT_FQ_CRAM.out.bam )
ch_versions = ch_versions.mix( SAMTOOLS_INDEX_FQ.out.versions )

CONVERT_FQ_CRAM.out.bam
| join ( SAMTOOLS_INDEX_FQ.out.crai )
| set { ch_reads_cram_crai }


//
// MODULE: generate a CRAM CSV file containing the required parametres for CRAM_FILTER_MINIMAP2_FILTER5END_FIXMATE_SORT
//
GENERATE_CRAM_CSV( ch_reads_cram_crai )
ch_versions = ch_versions.mix( GENERATE_CRAM_CSV.out.versions )
GENERATE_CRAM_CSV_FQ( ch_reads_cram_crai )
ch_versions = ch_versions.mix( GENERATE_CRAM_CSV_FQ.out.versions )

//
// SUBWORKFLOW: mapping pacbio reads using minimap2
//
MINIMAP2_MAPREDUCE (
fasta,
GENERATE_CRAM_CSV.out.csv
GENERATE_CRAM_CSV_FQ.out.csv
)
ch_versions = ch_versions.mix( MINIMAP2_MAPREDUCE.out.versions )
ch_merged_bam = ch_merged_bam.mix(MINIMAP2_MAPREDUCE.out.mergedbam)
Expand Down
40 changes: 40 additions & 0 deletions subworkflows/local/create_cram_filter_input.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
include { CRAM_FILTER } from '../../modules/local/cram_filter'

workflow CREATE_CRAM_FILTER_INPUT {
take:
csv_ch
fasta

main:
ch_versions = Channel.empty()

// Generate input channel for CRAM_FILTER
csv_ch
|splitCsv()
|combine(fasta)
|map { cram_id, cram_info, ref_id, ref_dir ->
tuple([
id: cram_id.id,
chunk_id: cram_id.id + "_" + cram_info[5],
genome_size: ref_id.genome_size,
read_count: cram_id.read_count
],
file(cram_info[0]),
cram_info[1],
cram_info[2],
cram_info[3],
cram_info[4],
cram_info[5],
cram_info[6],
ref_dir
)
}
| set { ch_cram_filter_input }

CRAM_FILTER(ch_cram_filter_input)
ch_versions = ch_versions.mix(CRAM_FILTER.out.versions)

emit:
chunked_cram = CRAM_FILTER.out.cram
versions = ch_versions
}
61 changes: 11 additions & 50 deletions subworkflows/local/filter_pacbio.nf
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,40 @@ include { BLAST_BLASTN } from '../../modules/nf-core/blast/
include { PACBIO_FILTER } from '../../modules/local/pacbio_filter'
include { SAMTOOLS_FILTERTOFASTQ } from '../../modules/local/samtools_filtertofastq'
include { SEQKIT_FQ2FA } from '../../modules/nf-core/seqkit/fq2fa'
include { BBMAP_FILTERBYNAME } from '../../modules/nf-core/bbmap/filterbyname'
include { BBMAP_FILTERBYNAME } from '../../modules/nf-core/bbmap/filterbyname'


workflow FILTER_PACBIO {
take:
reads // channel: [ val(meta), /path/to/datafile ]
db // channel: /path/to/vector_db


main:
ch_versions = Channel.empty()


// Check file types and branch
// Convert from PacBio CRAM to BAM
reads
| branch {
meta, reads ->
fastq : reads.findAll { it.getName().toLowerCase() =~ /.*f.*\.gz/ }
bam : true
}
| set { ch_reads }


// Convert from PacBio BAM to Samtools BAM
ch_reads.bam
| map { meta, bam -> [ meta, bam, [] ] }
| map { meta, cram -> [ meta, cram, [] ] }
| set { ch_pacbio }

SAMTOOLS_CONVERT ( ch_pacbio, [ [], [] ], [] )
ch_versions = ch_versions.mix ( SAMTOOLS_CONVERT.out.versions.first() )

ch_versions = ch_versions.mix ( SAMTOOLS_CONVERT.out.versions )

// Collate BAM file to create interleaved FASTA
SAMTOOLS_COLLATETOFASTA ( SAMTOOLS_CONVERT.out.bam )
ch_versions = ch_versions.mix ( SAMTOOLS_COLLATETOFASTA.out.versions.first() )


// Convert FASTQ to FASTA using SEQKIT_FQ2FA
SEQKIT_FQ2FA ( ch_reads.fastq )
ch_versions = ch_versions.mix ( SEQKIT_FQ2FA.out.versions.first() )
ch_versions = ch_versions.mix ( SAMTOOLS_COLLATETOFASTA.out.versions )


// Combine BAM-derived FASTA with converted FASTQ inputs
// Combine BAM-derived FASTA
SAMTOOLS_COLLATETOFASTA.out.fasta
| concat( SEQKIT_FQ2FA.out.fasta )
| set { ch_fasta }


// Nucleotide BLAST
BLAST_BLASTN ( ch_fasta, db )
ch_versions = ch_versions.mix ( BLAST_BLASTN.out.versions.first() )

ch_versions = ch_versions.mix ( BLAST_BLASTN.out.versions )

// Filter BLAST output
PACBIO_FILTER ( BLAST_BLASTN.out.txt )
ch_versions = ch_versions.mix ( PACBIO_FILTER.out.versions.first() )

ch_versions = ch_versions.mix ( PACBIO_FILTER.out.versions )

// Filter the input BAM and output as interleaved FASTA
SAMTOOLS_CONVERT.out.bam
Expand All @@ -78,29 +55,13 @@ workflow FILTER_PACBIO {
| set { ch_bam_reads }

SAMTOOLS_FILTERTOFASTQ ( ch_bam_reads.bams, ch_bam_reads.lists )
ch_versions = ch_versions.mix ( SAMTOOLS_FILTERTOFASTQ.out.versions.first() )


// Filter inputs provided as FASTQ and output as interleaved FASTQ
ch_reads.fastq
| join(PACBIO_FILTER.out.list)
| multiMap { meta, fastq, list -> \
fastqs: [meta, fastq]
lists: list
}
| set { ch_reads_fastq }

BBMAP_FILTERBYNAME ( ch_reads_fastq.fastqs, ch_reads_fastq.lists , "fastq", true)
ch_versions = ch_versions.mix ( BBMAP_FILTERBYNAME.out.versions.first() )

ch_versions = ch_versions.mix ( SAMTOOLS_FILTERTOFASTQ.out.versions )

// Merge filtered outputs as ch_output_fastq
BBMAP_FILTERBYNAME.out.reads
| concat ( SAMTOOLS_FILTERTOFASTQ.out.fastq )
SAMTOOLS_FILTERTOFASTQ.out.fastq
| set { ch_filtered_fastq }


emit:
fastq = ch_filtered_fastq // channel: [ meta, /path/to/fastq ]
versions = ch_versions // channel: [ versions.yml ]
}
}

0 comments on commit 83dd8cd

Please sign in to comment.