diff --git a/avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala b/avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala index 534cd5a1..99fab8c1 100644 --- a/avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala +++ b/avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkContext import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD, MDTagging } -import org.bdgenomics.avocado.realigner.Realigner +import org.bdgenomics.avocado.realigner.{ ConsensusRealigner, Realigner } import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.{ Argument, Option => Args4jOption } @@ -58,6 +58,10 @@ class ReassembleArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { name = "-defer_merging", usage = "Defers merging single file output") var deferMerging: Boolean = false + @Args4jOption(required = false, + name = "-use_consensus_realigner", + usage = "If provided, uses consensus-mode realigner.") + var useConsensusRealigner: Boolean = false // required by ADAMSaveAnyArgs var sortFastqOutput: Boolean = false @@ -79,7 +83,11 @@ class Reassemble( val reads = sc.loadAlignments(args.inputPath) // realign the reads - val reassembledReads = Realigner.realign(reads, args.kmerLength) + val reassembledReads = if (args.useConsensusRealigner) { + ConsensusRealigner.realign(reads, args.kmerLength) + } else { + Realigner.realign(reads, args.kmerLength) + } // save the reads reassembledReads.save(args) diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala new file mode 100644 index 00000000..0ba9d487 --- /dev/null +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala @@ -0,0 +1,64 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.avocado.realigner + +import org.bdgenomics.adam.algorithms.consensus.ConsensusGenerator +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.avocado.genotyping.DiscoverVariants +import org.bdgenomics.formats.avro.Variant + +object ConsensusRealigner { + + /** + * Realigns a set of reads against the reference genome. + * + * @param reads Reads to realign. + * @param kmerLength The length k of the k-mers. + * @return Returns the realigned reads. + */ + def realign(reads: AlignmentRecordRDD, + kmerLength: Int): AlignmentRecordRDD = { + + // use the realigner to make a first pass over the reads + val realignedReads = Realigner.realign(reads, kmerLength) + + // from here, we'll discover any potential variants + val variants = filterIndels(DiscoverVariants(realignedReads)) + + // we'll pass these discovered variants to ADAM's indel realigner + reads.realignIndels(ConsensusGenerator.fromKnownIndels(variants)) + } + + /** + * @param v The variant to filter. + * @return Returns false if the variant is a SNV/MNV. + */ + private[realigner] def discardSnvs(v: Variant): Boolean = { + v.getReferenceAllele.length != v.getAlternateAllele.length + } + + /** + * @param rdd The RDD of variants to filter. + * @return Returns a new RDD of variants where the SNVs & MNVs have been + * removed and only INDEL variants remain. + */ + private[realigner] def filterIndels(rdd: VariantRDD): VariantRDD = { + rdd.transform(r => r.filter(discardSnvs)) + } +} diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala index 5a6503c6..cd63a9e6 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala @@ -20,6 +20,7 @@ package org.bdgenomics.avocado.genotyping import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.avocado.AvocadoFunSuite import org.bdgenomics.avocado.models.Observation +import org.bdgenomics.avocado.realigner.ConsensusRealigner import org.bdgenomics.avocado.util.{ HardFilterGenotypes, TestHardFilterGenotypesArgs @@ -437,14 +438,16 @@ class BiallelicGenotyperSuite extends AvocadoFunSuite { }) } - ignore("call hom alt C->CCCCT insertion at 1/866511") { + sparkTest("call hom alt C->CCCCT insertion at 1/866511") { val readPath = resourceUrl("NA12878.chr1.866511.sam") val reads = sc.loadAlignments(readPath.toString) .transform(rdd => { rdd.filter(_.getMapq > 0) }) - val gts = BiallelicGenotyper.discoverAndCall(reads, + val realignedReads = ConsensusRealigner.realign(reads, 20) + + val gts = BiallelicGenotyper.discoverAndCall(realignedReads, 2, optPhredThreshold = Some(30)).transform(rdd => { rdd.filter(gt => gt.getAlleles.forall(_ == GenotypeAllele.ALT)) diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala new file mode 100644 index 00000000..b986577b --- /dev/null +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala @@ -0,0 +1,85 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.avocado.realigner + +import org.bdgenomics.adam.models.SequenceDictionary +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.formats.avro.Variant + +class ConsensusRealignerSuite extends SparkRealignerSuite { + + val allowLegacyCigars = true + + def realign(rdd: AlignmentRecordRDD, + kmerLength: Int): AlignmentRecordRDD = { + ConsensusRealigner.realign(rdd, kmerLength) + } + + val snv = Variant.newBuilder + .setStart(0L) + .setReferenceAllele("A") + .setAlternateAllele("G") + .build + val mnv = Variant.newBuilder + .setStart(1L) + .setReferenceAllele("AC") + .setAlternateAllele("GT") + .build + val ins = Variant.newBuilder + .setStart(2L) + .setReferenceAllele("A") + .setAlternateAllele("ACC") + .build + val del = Variant.newBuilder + .setStart(3L) + .setReferenceAllele("GCT") + .setAlternateAllele("G") + .build + + test("filter out a snv") { + assert(!ConsensusRealigner.discardSnvs(snv)) + } + + test("filter out a mnv") { + assert(!ConsensusRealigner.discardSnvs(mnv)) + } + + test("keep an insert") { + assert(ConsensusRealigner.discardSnvs(ins)) + } + + test("keep a deletion") { + assert(ConsensusRealigner.discardSnvs(del)) + } + + sparkTest("filter snv/mnv variants out") { + val vRdd = VariantRDD(sc.parallelize(Seq(snv, mnv, ins, del)), + SequenceDictionary.empty) + val filteredVariants = ConsensusRealigner.filterIndels(vRdd) + .rdd + .collect + + val variantIds = filteredVariants.map(_.getStart) + .toSet + + assert(variantIds.size === 2) + assert(variantIds(2L)) + assert(variantIds(3L)) + } +} diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala index 4c53d2fc..289dbf6d 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala @@ -30,6 +30,8 @@ import org.bdgenomics.formats.avro.AlignmentRecord class RealignerSuite extends SparkRealignerSuite { + val allowLegacyCigars = false + def realign(rdd: AlignmentRecordRDD, kmerLength: Int): AlignmentRecordRDD = { Realigner.realign(rdd, kmerLength) diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala index d035a413..13ee4d51 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala @@ -30,6 +30,8 @@ import org.bdgenomics.formats.avro.AlignmentRecord trait SparkRealignerSuite extends AvocadoFunSuite { + val allowLegacyCigars: Boolean + def realign(rdd: AlignmentRecordRDD, kmerLength: Int): AlignmentRecordRDD @@ -47,11 +49,16 @@ trait SparkRealignerSuite extends AvocadoFunSuite { } sparkTest("realign a set of reads around an insert") { - // insertion sequence: + // true insertion sequence: + // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA + // ref: AATGAGACTTACATCATT__AACCGTGTGGACACA + // + // test alignment: + // X: | // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA - // ref: AATGAGACTTACATCATTAA__CCGTGTGGACACA + // ref: AATGAGACTTACATCATTAAC__CGTGTGGACACA val sequence = "AATGAGACTTACATCATTAAAACCGTGTGGACACA" - val insertStart = 20 + val insertStart = 21 val readLength = insertStart + 6 + 2 // generate 7 reads with a 6bp flank @@ -65,10 +72,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite { .setRecordGroupName("rg") .setReadMapped(true) .setSequence(sequence.drop(rId).take(readLength)) + .setQual("*" * readLength) .setStart(rId.toLong) - .setEnd((rId + readLength - 2 + 1).toLong) + .setEnd((rId + insertStart + 6).toLong) .setCigar("%dM2I%dM".format(basesBeforeInsert, basesAfterInsert)) - .setMismatchingPositions((readLength - 2).toString) + .setMismatchingPositions("%dC%d".format(basesBeforeInsert - 1, + basesAfterInsert)) + .setMapq(50) .build() }) @@ -81,25 +91,34 @@ trait SparkRealignerSuite extends AvocadoFunSuite { // these values are different from above because original alignments were // not left justified - val basesBeforeInsert = insertStart - rId - 2 - val basesAfterInsert = 8 + rId - - assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert)) + val basesBeforeInsert = insertStart - rId - 3 + val basesAfterInsert = 9 + rId + + if (allowLegacyCigars) { + assert(r.getCigar === "%dM2I%dM".format(basesBeforeInsert, basesAfterInsert)) + } else { + assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert)) + } assert(r.getMismatchingPositions === (readLength - 2).toString) }) } sparkTest("realign a set of reads around a deletion") { - // deletion sequence: - // del: AGGTCTGAATGAGACTTA__TCATTAACCGTGTGGACACA + // true deletion sequence: + // del: AGGTCTGAATGAGACTT__ATCATTAACCGTGTGGACACA + // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA + // + // test alignment: + // X: | + // del: AGGTCTGAATGAGACT__TATCATTAACCGTGTGGACACA // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA val sequence = "AGGTCTGAATGAGACTTATCATTAACCGTGTGGACACA" - val deleteStart = 18 - val readLength = deleteStart + 8 + val deleteStart = 16 + val readLength = deleteStart + 7 // generate 10 reads with a 8bp flank val reads = (0 until 10).map(rId => { - val basesBeforeDelete = deleteStart - rId + val basesBeforeDelete = deleteStart - rId - 1 val basesAfterDelete = 8 + rId AlignmentRecord.newBuilder() @@ -108,10 +127,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite { .setRecordGroupName("rg") .setReadMapped(true) .setSequence(sequence.drop(rId).take(readLength)) + .setQual("*" * readLength) .setStart(rId.toLong) - .setEnd((rId + readLength + 2 + 1).toLong) + .setEnd((rId + readLength + 2).toLong) .setCigar("%dM2D%dM".format(basesBeforeDelete, basesAfterDelete)) - .setMismatchingPositions("%d^CA%d".format(basesBeforeDelete, basesAfterDelete)) + .setMismatchingPositions("%d^TA0C%d".format(basesBeforeDelete, + basesAfterDelete - 1)) + .setMapq(50) .build() }) @@ -124,23 +146,30 @@ trait SparkRealignerSuite extends AvocadoFunSuite { // these values are different from above because original alignments were // not left justified - val basesBeforeDelete = deleteStart - rId - 1 - val basesAfterDelete = 9 + rId + val basesBeforeDelete = deleteStart - rId + val basesAfterDelete = 7 + rId - assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete)) + if (allowLegacyCigars) { + assert(r.getCigar === "%dM2D%dM".format(basesBeforeDelete, basesAfterDelete)) + } else { + assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete)) + } assert(r.getMismatchingPositions === "%d^AC%d".format(basesBeforeDelete, basesAfterDelete)) }) } sparkTest("realigning a read with a repeat will return the original read") { val read = AlignmentRecord.newBuilder() + .setContigName("ctg") .setReadName("A_READ") .setReadMapped(true) .setStart(10L) .setEnd(17L) .setSequence("TCAAAAAAGG") + .setQual("**********") .setCigar("3M4I3M") .setMismatchingPositions("6") + .setMapq(50) .build() // make into a genomic rdd diff --git a/pom.xml b/pom.xml index ed267068..e9792168 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ avocado: A Variant Caller, Distributed - 0.21.0 + 0.21.1-SNAPSHOT 1.8.0 1.8 2.10.4