From b924412953594bb1a266cc7cde085dbf944f00d9 Mon Sep 17 00:00:00 2001 From: reichan1998 Date: Thu, 24 Oct 2024 16:07:55 +0700 Subject: [PATCH 1/3] Refactor SAMTOOLS_MERGE to include FASTA parameter, stage REF_CACHE and export REF_PATH --- bin/seq_cache_populate.pl | 327 +++++++++++++++++++++++++++ conf/base.config | 5 + modules/local/cram_cache.nf | 27 +++ subworkflows/local/align_ont.nf | 2 +- subworkflows/local/align_pacbio.nf | 4 +- subworkflows/local/align_short.nf | 2 +- subworkflows/local/filter_pacbio.nf | 3 +- subworkflows/local/prepare_genome.nf | 10 +- 8 files changed, 372 insertions(+), 8 deletions(-) create mode 100755 bin/seq_cache_populate.pl create mode 100644 modules/local/cram_cache.nf diff --git a/bin/seq_cache_populate.pl b/bin/seq_cache_populate.pl new file mode 100755 index 0000000..88a0cac --- /dev/null +++ b/bin/seq_cache_populate.pl @@ -0,0 +1,327 @@ +#!/usr/bin/env perl + +# The MIT License + +# Copyright (c) 2014, 2020 Genome Research Ltd. +# Author: Rob Davies + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +# Import references into a cram reference cache from fasta files. +# See below __END__ for POD documentation. + +use strict; +use warnings; +use Digest::MD5; +use Getopt::Long; +use File::Find; +use File::Temp qw(tempfile); +use File::Spec::Functions; +use File::Path 'make_path'; +use IO::Handle; + +$| = 1; + +# Directory where the cache will be built +my $root_dir; + +# Number of subdirectories to make below $root_dir +# Each subdir will eat up two hex digits of the file MD5 +my $subdirs = 2; + +# Directory tree to search when using the -find option +my $find = ''; + +# How much data to read before spilling to a file +my $max_acc = 256 * 1024 * 1024; + +my $usage = "Usage: $0 -root [-subdirs ] input1.fasta ...\n $0 -root [-subdirs ] -find \n"; + +# Deal with options +GetOptions("root=s" => \$root_dir, "subdirs=s" => \$subdirs, + "find=s" => \$find) || die $usage; + +unless ($root_dir && $subdirs =~ /^\d+$/) { die $usage; } +if ($subdirs >= 16) { + die "$0: Error: -subdirs should be less than 15.\n"; +} + +# Regexp to convert a hex MD5 to a list of $subdirs subdirectory names, the +# remainder making the filename in the leaf directory +my $dest_regexp = "(..)" x $subdirs . "(" . ".." x (16 - $subdirs) . ")"; +my $dest_re = qr/$dest_regexp/; + +# Ensure $root_dir exists +unless (-e $root_dir) { + make_path($root_dir); +} + +if ($find) { + # Find mode - search a directory tree for anything that looks like a + # fasta file. Any that are found will be put into the new cache, if + # they are not already there. + find({ + wanted => sub { + find_files($File::Find::name, $root_dir, $dest_re, $max_acc); + }, + no_chdir => 1, + }, + $find); +} elsif (@ARGV) { + # If a list of files was given on the command line, go through them + # and try to add each one. + foreach my $name (@ARGV) { + open(my $fh, '<', $name) || die "Couldn't open $name: $!\n"; + process_file($name, $fh, $root_dir, $dest_re, $max_acc); + close($fh) || die "Error closing $name: $!\n"; + } +} else { + # Otherwise read from STDIN + process_file('STDIN', \*STDIN, $root_dir, $dest_re, $max_acc); +} + +print "\n"; +print "Use environment REF_CACHE=$root_dir" . "/%2s" x $subdirs . + "/%s for accessing these files.\n"; +print "See also https://www.htslib.org/workflow/#the-ref_path-and-ref_cache for\nfurther information.\n"; +exit; + +sub find_files { + my ($name, $root_dir, $dest_re, $max_acc) = @_; + + # See if $name is a candidate file + + my $fh; + return if ($name =~ /~$/); # Ignore backup files + return unless (-f $name && -r _); # Ignore non-regular and unreadable files + + # Inspect the first two lines of the candidate + my $buffer; + open($fh, '<', $name) || die "Couldn't open $name: $!\n"; + read($fh, $buffer, 8192); # Should be enough to find the header & sequence + close($fh) || die "Error closing $name: $!\n"; + my ($l1, $l2) = split(/\n/, $buffer); + + # Check for fasta-like content + return unless ($l1 && $l1 =~ /^>\S+/); + return unless ($l2 && $l2 =~ /^[ACGTMRWSYKVHDBNacgtmrwsykvhdbn]+$/); + + # Looks like a fasta file, so process it + open($fh, '<', $name) || die "Couldn't open $name: $!\n"; + process_file($name, $fh, $root_dir, $dest_re, $max_acc); + close($fh) || die "Error closing $name: $!\n"; +} + +sub process_file { + my ($name, $in_fh, $root_dir, $dest_re, $max_acc) = @_; + + # Process the fasta file $in_fh. Each entry in the file is read, and + # the MD5 calculated as described in the SAM specification (i.e. + # all uppercased with whitespace stripped out). The MD5 is then + # split using $dest_re to convert it into the path name for the entry + # in the cache. If the path is not already present, the entry (in + # uppercased and stripped form) is saved into the cache. + + # Entries shorter that $max_acc will be kept in memory. For fasta files + # with lots of short entries this can save a lot of unnecessary writing + # if the data is already in the cache. Anything longer + # gets written out to a file to keep memory consumption under control. + # The temporary files have to be made in $root_dir, as the final + # destination is not known until the entry has been completely read. + + my $id; # Name of current fasta entry + my $ctx; # MD5 context + my $acc = ''; # The accumulated sequence + my $tmpfile; # Temporary file name + my $tmpfh; # Temporary file handle + my $extra = 1024; # Extra space to pre-allocate to account for reading + # 1 line past $max_acc + vec($acc, $max_acc + $extra, 8) = 1; # Pre-allocate some space + $acc = ''; + + # Use an eval block so any stray temporary file can be cleaned up before + # exiting. + eval { + print "Reading $name ...\n"; + for (;;) { # Error catching form of while (<>) {...} + undef($!); + last if (eof($in_fh)); # Needed if last line isn't terminated + unless (defined($_ = readline($in_fh))) { + die "Error reading $name: $!" if $!; + last; # EOF + } + + if (/^>(\S+)/) { + # Found a fasta header + if ($ctx) { # Finish previous entry, if there is one + finish_entry($id, $ctx, \$acc, $tmpfh, $tmpfile, + $root_dir, $dest_re); + undef($tmpfile); + $acc = ''; + } + $id = $1; + $ctx = Digest::MD5->new(); + } else { + unless ($id) { die "Found sequence with no header\n"; } + # Read some sequence + chomp; + s/\s+//g; + if ($_) { + $_ = uc($_); + $acc .= $_; + $ctx->add($_); + + if (length($acc) > $max_acc) { + # Spill long sequences out to a temporary file in + # $root_dir. + unless ($tmpfile) { + ($tmpfh, $tmpfile) = tempfile(DIR => $root_dir, + SUFFIX => '.tmp'); + } + print $tmpfh $acc + || die "Error writing to $tmpfile: $!\n"; + $acc = ''; + } + } + } + } + if ($ctx) { + # Finish off the last entry + finish_entry($id, $ctx, \$acc, $tmpfh, $tmpfile, + $root_dir, $dest_re); + undef($tmpfile); + } + }; + my $err = $@; + if ($tmpfile) { unlink($tmpfile); } + if ($err) { die $err; } +} + +sub finish_entry { + my ($id, $ctx, $acc_ref, $tmpfh, $tmpfile, $root_dir, $dest_re) = @_; + + # Finish writing an entry + + my $digest = $ctx->hexdigest; + + # Get the destination directory and filename + my @segs = $digest =~ /$dest_re/; + my $dest_dir = (@segs > 1 + ? catdir($root_dir, @segs[0..($#segs - 1)]) + : $root_dir); + my $dest = catfile($dest_dir, $segs[-1]); + + # Make the destination dir if necessary + unless (-e $dest_dir) { + make_path($dest_dir); + } + + if (-e $dest) { + # If the file is already present, there's nothing to do apart from + # remove the temporary file if it was made. + print "Already exists: $digest $id\n"; + if ($tmpfile) { + close($tmpfh) || die "Error closing $tmpfile: $!\n"; + unlink($tmpfile) || die "Couldn't remove $tmpfile: $!\n"; + } + } else { + # Need to add the data to the cache. + unless ($tmpfile) { + # If the data hasn't been written already, it needs to be done + # now. Write to a temp file in $dest_dir so if it goes wrong + # we won't leave a file with the right name but half-written + # content. + ($tmpfh, $tmpfile) = tempfile(DIR => $dest_dir, + SUFFIX => '.tmp'); + } + + # Assert that the $tmpfile is now set + unless ($tmpfile) { die "Error: Didn't make a temp file"; } + + eval { + # Flush out any remaining data + if ($$acc_ref) { + print $tmpfh $$acc_ref || die "Error writing to $tmpfile: $!\n"; + } + # Paranoid file close + $tmpfh->flush() || die "Error flushing to $tmpfile: $!\n"; + $tmpfh->sync() || die "Error syncing $tmpfile: $!\n"; + close($tmpfh) || die "Error writing to $tmpfile: $!\n"; + }; + if ($@) { + # Attempt to clean up if writing failed + my $save = $@; + unlink($tmpfile) || warn "Couldn't remove $tmpfile: $!"; + die $save; + } + + # Finished writing, and everything is flushed as far as possible + # so rename the temp file + print "$dest $id\n"; + rename($tmpfile, $dest) + || die "Error moving $tmpfile to $dest: $!\n"; + } +} + +__END__ + +=head1 NAME + +seq_cache_populate.pl + +=head1 SYNOPSIS + + seq_cache_populate.pl -root [-subdirs ] input1.fasta ... + + seq_cache_populate.pl -root [-subdirs ] -find + +=head1 DESCRIPTION + +Import references into a cram reference cache from fasta files. + +When run with a list of fasta files, this program reads the files and stores +the sequences within it in the reference cache under directory . The +sequences in the cache are stored with names based on the MD5 checksum +of the sequence. + +By default, sequences are stored in a hierarchy two directories deep, to +keep the number of items in a single directory to a reasonable number. This +depth can be changed using the -subdirs option. + +If the -find option is used, the program will scan the given directory tree. +Any files that appear to be fasta (by looking for a first line starting with +'>' followed by something that looks like DNA sequence) will be read and +added to the reference cache. The traversal will ignore symbolic links. + +Samtools/htslib can be made to use the cache by appropriate setting of the +REF_PATH environment variable. For example, if seq_cache_populate was run +using options '-root /tmp/ref_cache -subdirs 2', setting REF_PATH to +'/tmp/ref_cache/%2s/%2s/%s' should allow samtools to find the references that +it stored. + +Note that if no REF_PATH is specified, htslib will default to downloading from +the EBI reference server and caching locally (see the samtools(1) man page for +details), defaulting to $HOME/.cache/hts-ref/%2s/%2s/%s. This is functionally +equivalent to running this tool with '-root $HOME/.cache/hts-ref -subdirs 2'. + +=head1 AUTHOR + +Rob Davies. + +=cut \ No newline at end of file diff --git a/conf/base.config b/conf/base.config index cd54c75..e34b832 100644 --- a/conf/base.config +++ b/conf/base.config @@ -109,6 +109,11 @@ process { memory = { check_max( 1.GB * Math.ceil( 30 * fasta.size() / 1e+9 ) * task.attempt, 'memory' ) } } + withName: CRAM_CACHE { + cpus = { check_max( 1 * task.attempt, 'cpus' ) } + memory = { check_max( 4.GB * Math.ceil( 30 * fasta.size() / 1e+9 ) * task.attempt, 'memory' ) } + } + withName: CRUMBLE { // No correlation between memory usage and the number of reads or the genome size. // Most genomes seem happy with 1 GB, then some with 2 GB, then some with 5 GB. diff --git a/modules/local/cram_cache.nf b/modules/local/cram_cache.nf new file mode 100644 index 0000000..7da9bea --- /dev/null +++ b/modules/local/cram_cache.nf @@ -0,0 +1,27 @@ +// Process to generate the CRAM cache and +// create the REF_PATH variable +// Original author: epic2me-labs +process CRAM_CACHE { + tag "$meta.id" + label 'process_single' + + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'https://depot.galaxyproject.org/singularity/mulled-v2-1a6fe65bd6674daba65066aa796ed8f5e8b4687b:688e175eb0db54de17822ba7810cc9e20fa06dd5-0' : + 'biocontainers/mulled-v2-1a6fe65bd6674daba65066aa796ed8f5e8b4687b:688e175eb0db54de17822ba7810cc9e20fa06dd5-0' }" + + input: + tuple val(meta), path(reference) + + output: + tuple path("ref_cache/"), env(REF_PATH), emit: ref_cache + + when: + task.ext.when == null || task.ext.when + + shell: + ''' + # Invoke from binary installed to container PATH + seq_cache_populate.pl -root ref_cache/ !{reference} + REF_PATH="ref_cache/%2s/%2s/%s" + ''' +} \ No newline at end of file diff --git a/subworkflows/local/align_ont.nf b/subworkflows/local/align_ont.nf index ef1a021..3a7cb4e 100644 --- a/subworkflows/local/align_ont.nf +++ b/subworkflows/local/align_ont.nf @@ -35,7 +35,7 @@ workflow ALIGN_ONT { // Merge, but only if there is more than 1 file - SAMTOOLS_MERGE ( ch_bams.multi_bams, [ [], [] ], [ [], [] ] ) + SAMTOOLS_MERGE ( ch_bams.multi_bams, fasta, [ [], [] ] ) ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions.first() ) diff --git a/subworkflows/local/align_pacbio.nf b/subworkflows/local/align_pacbio.nf index f472a6c..2ef4498 100644 --- a/subworkflows/local/align_pacbio.nf +++ b/subworkflows/local/align_pacbio.nf @@ -19,7 +19,7 @@ workflow ALIGN_PACBIO { // Filter BAM and output as FASTQ - FILTER_PACBIO ( reads, db ) + FILTER_PACBIO ( reads, db, fasta ) ch_versions = ch_versions.mix ( FILTER_PACBIO.out.versions ) @@ -41,7 +41,7 @@ workflow ALIGN_PACBIO { // Merge, but only if there is more than 1 file - SAMTOOLS_MERGE ( ch_bams.multi_bams, [ [], [] ], [ [], [] ] ) + SAMTOOLS_MERGE ( ch_bams.multi_bams, fasta, [ [], [] ] ) ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions.first() ) diff --git a/subworkflows/local/align_short.nf b/subworkflows/local/align_short.nf index 4f49cdc..5ef4696 100644 --- a/subworkflows/local/align_short.nf +++ b/subworkflows/local/align_short.nf @@ -99,7 +99,7 @@ workflow ALIGN_SHORT { // Merge, but only if there is more than 1 file - SAMTOOLS_MERGE ( ch_bams.multi_bams, [ [], [] ], [ [], [] ] ) + SAMTOOLS_MERGE ( ch_bams.multi_bams, fasta, [ [], [] ] ) ch_versions = ch_versions.mix ( SAMTOOLS_MERGE.out.versions ) diff --git a/subworkflows/local/filter_pacbio.nf b/subworkflows/local/filter_pacbio.nf index acb21fa..14d2c3a 100644 --- a/subworkflows/local/filter_pacbio.nf +++ b/subworkflows/local/filter_pacbio.nf @@ -16,6 +16,7 @@ workflow FILTER_PACBIO { take: reads // channel: [ val(meta), /path/to/datafile ] db // channel: /path/to/vector_db + fasta // channel: [ val(meta), /path/to/fasta ] main: @@ -37,7 +38,7 @@ workflow FILTER_PACBIO { | map { meta, bam -> [ meta, bam, [] ] } | set { ch_pacbio } - SAMTOOLS_CONVERT ( ch_pacbio, [ [], [] ], [] ) + SAMTOOLS_CONVERT ( ch_pacbio, fasta, [] ) ch_versions = ch_versions.mix ( SAMTOOLS_CONVERT.out.versions.first() ) diff --git a/subworkflows/local/prepare_genome.nf b/subworkflows/local/prepare_genome.nf index f7a4754..e429b12 100644 --- a/subworkflows/local/prepare_genome.nf +++ b/subworkflows/local/prepare_genome.nf @@ -6,6 +6,7 @@ include { GUNZIP } from '../../modules/nf-core/gunzip/main' include { UNMASK } from '../../modules/local/unmask' include { UNTAR } from '../../modules/nf-core/untar/main' include { BWAMEM2_INDEX } from '../../modules/nf-core/bwamem2/index/main' +include { CRAM_CACHE } from '../../modules/local/cram_cache' workflow PREPARE_GENOME { @@ -32,6 +33,9 @@ workflow PREPARE_GENOME { UNMASK ( ch_fasta ) ch_versions = ch_versions.mix ( UNMASK.out.versions ) + // Generate CRAM cache + CRAM_CACHE ( UNMASK.out.fasta ) + // Generate BWA index if ( checkShortReads( params.input ) ) { if ( params.bwamem2_index ) { @@ -57,9 +61,9 @@ workflow PREPARE_GENOME { emit: - fasta = UNMASK.out.fasta.first() // channel: [ meta, /path/to/fasta ] - bwaidx = ch_bwamem2_index.first() // channel: [ meta, /path/to/bwamem2/index_dir/ ] - versions = ch_versions // channel: [ versions.yml ] + fasta = UNMASK.out.fasta // channel: [ meta, /path/to/fasta ] + bwaidx = ch_bwamem2_index // channel: [ meta, /path/to/bwamem2/index_dir/ ] + versions = ch_versions // channel: [ versions.yml ] } // From e032797460031a7a8e9e8ec3ea4c3bf97f1640fc Mon Sep 17 00:00:00 2001 From: reichan1998 Date: Thu, 24 Oct 2024 16:39:02 +0700 Subject: [PATCH 2/3] fix editor-config --- bin/seq_cache_populate.pl | 22 +++++++++++----------- modules/local/cram_cache.nf | 1 - 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/bin/seq_cache_populate.pl b/bin/seq_cache_populate.pl index 88a0cac..e36ce02 100755 --- a/bin/seq_cache_populate.pl +++ b/bin/seq_cache_populate.pl @@ -55,7 +55,7 @@ # Deal with options GetOptions("root=s" => \$root_dir, "subdirs=s" => \$subdirs, - "find=s" => \$find) || die $usage; + "find=s" => \$find) || die $usage; unless ($root_dir && $subdirs =~ /^\d+$/) { die $usage; } if ($subdirs >= 16) { @@ -81,8 +81,8 @@ find_files($File::Find::name, $root_dir, $dest_re, $max_acc); }, no_chdir => 1, - }, - $find); + }, + $find); } elsif (@ARGV) { # If a list of files was given on the command line, go through them # and try to add each one. @@ -151,7 +151,7 @@ sub process_file { my $tmpfile; # Temporary file name my $tmpfh; # Temporary file handle my $extra = 1024; # Extra space to pre-allocate to account for reading - # 1 line past $max_acc + # 1 line past $max_acc vec($acc, $max_acc + $extra, 8) = 1; # Pre-allocate some space $acc = ''; @@ -171,7 +171,7 @@ sub process_file { # Found a fasta header if ($ctx) { # Finish previous entry, if there is one finish_entry($id, $ctx, \$acc, $tmpfh, $tmpfile, - $root_dir, $dest_re); + $root_dir, $dest_re); undef($tmpfile); $acc = ''; } @@ -192,7 +192,7 @@ sub process_file { # $root_dir. unless ($tmpfile) { ($tmpfh, $tmpfile) = tempfile(DIR => $root_dir, - SUFFIX => '.tmp'); + SUFFIX => '.tmp'); } print $tmpfh $acc || die "Error writing to $tmpfile: $!\n"; @@ -204,7 +204,7 @@ sub process_file { if ($ctx) { # Finish off the last entry finish_entry($id, $ctx, \$acc, $tmpfh, $tmpfile, - $root_dir, $dest_re); + $root_dir, $dest_re); undef($tmpfile); } }; @@ -248,7 +248,7 @@ sub finish_entry { # we won't leave a file with the right name but half-written # content. ($tmpfh, $tmpfile) = tempfile(DIR => $dest_dir, - SUFFIX => '.tmp'); + SUFFIX => '.tmp'); } # Assert that the $tmpfile is now set @@ -287,9 +287,9 @@ =head1 NAME =head1 SYNOPSIS - seq_cache_populate.pl -root [-subdirs ] input1.fasta ... +seq_cache_populate.pl -root [-subdirs ] input1.fasta ... - seq_cache_populate.pl -root [-subdirs ] -find +seq_cache_populate.pl -root [-subdirs ] -find =head1 DESCRIPTION @@ -324,4 +324,4 @@ =head1 AUTHOR Rob Davies. -=cut \ No newline at end of file +=cut diff --git a/modules/local/cram_cache.nf b/modules/local/cram_cache.nf index 7da9bea..5d7ab9b 100644 --- a/modules/local/cram_cache.nf +++ b/modules/local/cram_cache.nf @@ -24,4 +24,3 @@ process CRAM_CACHE { seq_cache_populate.pl -root ref_cache/ !{reference} REF_PATH="ref_cache/%2s/%2s/%s" ''' -} \ No newline at end of file From 43dbe87753d0b966abbe597f5f5a8682deb32b1c Mon Sep 17 00:00:00 2001 From: reichan1998 Date: Thu, 24 Oct 2024 16:41:22 +0700 Subject: [PATCH 3/3] fix accidental error --- modules/local/cram_cache.nf | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/local/cram_cache.nf b/modules/local/cram_cache.nf index 5d7ab9b..c4d3031 100644 --- a/modules/local/cram_cache.nf +++ b/modules/local/cram_cache.nf @@ -24,3 +24,4 @@ process CRAM_CACHE { seq_cache_populate.pl -root ref_cache/ !{reference} REF_PATH="ref_cache/%2s/%2s/%s" ''' +}