From 908349d99387a281e1f45b0ac9aac45ec1495f6b Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Sun, 22 Jan 2017 23:18:01 -0800 Subject: [PATCH] [AVOCADO-200] Feed assembled variants to INDEL realigner. Resolves #200. Depends on https://github.com/bigdatagenomics/adam/pull/1363. * Adds a ConsensusRealigner that uses the main Realigner to realign reads, which are then fed into the variant discovery engine with loose filters. Any INDEL variants that are discovered are then fed in to ADAM's INDEL realignment engine as known INDELs. * Uses this realignment mode in the BiallelicGenotyperSuite to clean up a previously failing test. * Adds this to the Reassemble CLI as a `-use_consensus_realigner` switch. --- .../org/bdgenomics/avocado/Reassemble.scala | 12 ++- .../realigner/ConsensusRealigner.scala | 64 ++++++++++++++ .../genotyping/BiallelicGenotyperSuite.scala | 7 +- .../realigner/ConsensusRealignerSuite.scala | 85 +++++++++++++++++++ .../avocado/realigner/RealignerSuite.scala | 2 + .../realigner/SparkRealignerSuite.scala | 67 ++++++++++----- pom.xml | 2 +- 7 files changed, 215 insertions(+), 24 deletions(-) create mode 100644 avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala create mode 100644 avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala diff --git a/avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala b/avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala index 534cd5a1..99fab8c1 100644 --- a/avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala +++ b/avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkContext import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD, MDTagging } -import org.bdgenomics.avocado.realigner.Realigner +import org.bdgenomics.avocado.realigner.{ ConsensusRealigner, Realigner } import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.{ Argument, Option => Args4jOption } @@ -58,6 +58,10 @@ class ReassembleArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { name = "-defer_merging", usage = "Defers merging single file output") var deferMerging: Boolean = false + @Args4jOption(required = false, + name = "-use_consensus_realigner", + usage = "If provided, uses consensus-mode realigner.") + var useConsensusRealigner: Boolean = false // required by ADAMSaveAnyArgs var sortFastqOutput: Boolean = false @@ -79,7 +83,11 @@ class Reassemble( val reads = sc.loadAlignments(args.inputPath) // realign the reads - val reassembledReads = Realigner.realign(reads, args.kmerLength) + val reassembledReads = if (args.useConsensusRealigner) { + ConsensusRealigner.realign(reads, args.kmerLength) + } else { + Realigner.realign(reads, args.kmerLength) + } // save the reads reassembledReads.save(args) diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala new file mode 100644 index 00000000..0ba9d487 --- /dev/null +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala @@ -0,0 +1,64 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.avocado.realigner + +import org.bdgenomics.adam.algorithms.consensus.ConsensusGenerator +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.avocado.genotyping.DiscoverVariants +import org.bdgenomics.formats.avro.Variant + +object ConsensusRealigner { + + /** + * Realigns a set of reads against the reference genome. + * + * @param reads Reads to realign. + * @param kmerLength The length k of the k-mers. + * @return Returns the realigned reads. + */ + def realign(reads: AlignmentRecordRDD, + kmerLength: Int): AlignmentRecordRDD = { + + // use the realigner to make a first pass over the reads + val realignedReads = Realigner.realign(reads, kmerLength) + + // from here, we'll discover any potential variants + val variants = filterIndels(DiscoverVariants(realignedReads)) + + // we'll pass these discovered variants to ADAM's indel realigner + reads.realignIndels(ConsensusGenerator.fromKnownIndels(variants)) + } + + /** + * @param v The variant to filter. + * @return Returns false if the variant is a SNV/MNV. + */ + private[realigner] def discardSnvs(v: Variant): Boolean = { + v.getReferenceAllele.length != v.getAlternateAllele.length + } + + /** + * @param rdd The RDD of variants to filter. + * @return Returns a new RDD of variants where the SNVs & MNVs have been + * removed and only INDEL variants remain. + */ + private[realigner] def filterIndels(rdd: VariantRDD): VariantRDD = { + rdd.transform(r => r.filter(discardSnvs)) + } +} diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala index 5a6503c6..cd63a9e6 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala @@ -20,6 +20,7 @@ package org.bdgenomics.avocado.genotyping import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.avocado.AvocadoFunSuite import org.bdgenomics.avocado.models.Observation +import org.bdgenomics.avocado.realigner.ConsensusRealigner import org.bdgenomics.avocado.util.{ HardFilterGenotypes, TestHardFilterGenotypesArgs @@ -437,14 +438,16 @@ class BiallelicGenotyperSuite extends AvocadoFunSuite { }) } - ignore("call hom alt C->CCCCT insertion at 1/866511") { + sparkTest("call hom alt C->CCCCT insertion at 1/866511") { val readPath = resourceUrl("NA12878.chr1.866511.sam") val reads = sc.loadAlignments(readPath.toString) .transform(rdd => { rdd.filter(_.getMapq > 0) }) - val gts = BiallelicGenotyper.discoverAndCall(reads, + val realignedReads = ConsensusRealigner.realign(reads, 20) + + val gts = BiallelicGenotyper.discoverAndCall(realignedReads, 2, optPhredThreshold = Some(30)).transform(rdd => { rdd.filter(gt => gt.getAlleles.forall(_ == GenotypeAllele.ALT)) diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala new file mode 100644 index 00000000..b986577b --- /dev/null +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala @@ -0,0 +1,85 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.avocado.realigner + +import org.bdgenomics.adam.models.SequenceDictionary +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.formats.avro.Variant + +class ConsensusRealignerSuite extends SparkRealignerSuite { + + val allowLegacyCigars = true + + def realign(rdd: AlignmentRecordRDD, + kmerLength: Int): AlignmentRecordRDD = { + ConsensusRealigner.realign(rdd, kmerLength) + } + + val snv = Variant.newBuilder + .setStart(0L) + .setReferenceAllele("A") + .setAlternateAllele("G") + .build + val mnv = Variant.newBuilder + .setStart(1L) + .setReferenceAllele("AC") + .setAlternateAllele("GT") + .build + val ins = Variant.newBuilder + .setStart(2L) + .setReferenceAllele("A") + .setAlternateAllele("ACC") + .build + val del = Variant.newBuilder + .setStart(3L) + .setReferenceAllele("GCT") + .setAlternateAllele("G") + .build + + test("filter out a snv") { + assert(!ConsensusRealigner.discardSnvs(snv)) + } + + test("filter out a mnv") { + assert(!ConsensusRealigner.discardSnvs(mnv)) + } + + test("keep an insert") { + assert(ConsensusRealigner.discardSnvs(ins)) + } + + test("keep a deletion") { + assert(ConsensusRealigner.discardSnvs(del)) + } + + sparkTest("filter snv/mnv variants out") { + val vRdd = VariantRDD(sc.parallelize(Seq(snv, mnv, ins, del)), + SequenceDictionary.empty) + val filteredVariants = ConsensusRealigner.filterIndels(vRdd) + .rdd + .collect + + val variantIds = filteredVariants.map(_.getStart) + .toSet + + assert(variantIds.size === 2) + assert(variantIds(2L)) + assert(variantIds(3L)) + } +} diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala index 4c53d2fc..289dbf6d 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala @@ -30,6 +30,8 @@ import org.bdgenomics.formats.avro.AlignmentRecord class RealignerSuite extends SparkRealignerSuite { + val allowLegacyCigars = false + def realign(rdd: AlignmentRecordRDD, kmerLength: Int): AlignmentRecordRDD = { Realigner.realign(rdd, kmerLength) diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala index d035a413..13ee4d51 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala @@ -30,6 +30,8 @@ import org.bdgenomics.formats.avro.AlignmentRecord trait SparkRealignerSuite extends AvocadoFunSuite { + val allowLegacyCigars: Boolean + def realign(rdd: AlignmentRecordRDD, kmerLength: Int): AlignmentRecordRDD @@ -47,11 +49,16 @@ trait SparkRealignerSuite extends AvocadoFunSuite { } sparkTest("realign a set of reads around an insert") { - // insertion sequence: + // true insertion sequence: + // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA + // ref: AATGAGACTTACATCATT__AACCGTGTGGACACA + // + // test alignment: + // X: | // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA - // ref: AATGAGACTTACATCATTAA__CCGTGTGGACACA + // ref: AATGAGACTTACATCATTAAC__CGTGTGGACACA val sequence = "AATGAGACTTACATCATTAAAACCGTGTGGACACA" - val insertStart = 20 + val insertStart = 21 val readLength = insertStart + 6 + 2 // generate 7 reads with a 6bp flank @@ -65,10 +72,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite { .setRecordGroupName("rg") .setReadMapped(true) .setSequence(sequence.drop(rId).take(readLength)) + .setQual("*" * readLength) .setStart(rId.toLong) - .setEnd((rId + readLength - 2 + 1).toLong) + .setEnd((rId + insertStart + 6).toLong) .setCigar("%dM2I%dM".format(basesBeforeInsert, basesAfterInsert)) - .setMismatchingPositions((readLength - 2).toString) + .setMismatchingPositions("%dC%d".format(basesBeforeInsert - 1, + basesAfterInsert)) + .setMapq(50) .build() }) @@ -81,25 +91,34 @@ trait SparkRealignerSuite extends AvocadoFunSuite { // these values are different from above because original alignments were // not left justified - val basesBeforeInsert = insertStart - rId - 2 - val basesAfterInsert = 8 + rId - - assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert)) + val basesBeforeInsert = insertStart - rId - 3 + val basesAfterInsert = 9 + rId + + if (allowLegacyCigars) { + assert(r.getCigar === "%dM2I%dM".format(basesBeforeInsert, basesAfterInsert)) + } else { + assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert)) + } assert(r.getMismatchingPositions === (readLength - 2).toString) }) } sparkTest("realign a set of reads around a deletion") { - // deletion sequence: - // del: AGGTCTGAATGAGACTTA__TCATTAACCGTGTGGACACA + // true deletion sequence: + // del: AGGTCTGAATGAGACTT__ATCATTAACCGTGTGGACACA + // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA + // + // test alignment: + // X: | + // del: AGGTCTGAATGAGACT__TATCATTAACCGTGTGGACACA // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA val sequence = "AGGTCTGAATGAGACTTATCATTAACCGTGTGGACACA" - val deleteStart = 18 - val readLength = deleteStart + 8 + val deleteStart = 16 + val readLength = deleteStart + 7 // generate 10 reads with a 8bp flank val reads = (0 until 10).map(rId => { - val basesBeforeDelete = deleteStart - rId + val basesBeforeDelete = deleteStart - rId - 1 val basesAfterDelete = 8 + rId AlignmentRecord.newBuilder() @@ -108,10 +127,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite { .setRecordGroupName("rg") .setReadMapped(true) .setSequence(sequence.drop(rId).take(readLength)) + .setQual("*" * readLength) .setStart(rId.toLong) - .setEnd((rId + readLength + 2 + 1).toLong) + .setEnd((rId + readLength + 2).toLong) .setCigar("%dM2D%dM".format(basesBeforeDelete, basesAfterDelete)) - .setMismatchingPositions("%d^CA%d".format(basesBeforeDelete, basesAfterDelete)) + .setMismatchingPositions("%d^TA0C%d".format(basesBeforeDelete, + basesAfterDelete - 1)) + .setMapq(50) .build() }) @@ -124,23 +146,30 @@ trait SparkRealignerSuite extends AvocadoFunSuite { // these values are different from above because original alignments were // not left justified - val basesBeforeDelete = deleteStart - rId - 1 - val basesAfterDelete = 9 + rId + val basesBeforeDelete = deleteStart - rId + val basesAfterDelete = 7 + rId - assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete)) + if (allowLegacyCigars) { + assert(r.getCigar === "%dM2D%dM".format(basesBeforeDelete, basesAfterDelete)) + } else { + assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete)) + } assert(r.getMismatchingPositions === "%d^AC%d".format(basesBeforeDelete, basesAfterDelete)) }) } sparkTest("realigning a read with a repeat will return the original read") { val read = AlignmentRecord.newBuilder() + .setContigName("ctg") .setReadName("A_READ") .setReadMapped(true) .setStart(10L) .setEnd(17L) .setSequence("TCAAAAAAGG") + .setQual("**********") .setCigar("3M4I3M") .setMismatchingPositions("6") + .setMapq(50) .build() // make into a genomic rdd diff --git a/pom.xml b/pom.xml index ed267068..e9792168 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ avocado: A Variant Caller, Distributed - 0.21.0 + 0.21.1-SNAPSHOT 1.8.0 1.8 2.10.4