From f3df6e8d3c5d86647d0aca28535f12fe75936903 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Fri, 20 Jan 2017 23:59:44 -0800 Subject: [PATCH 1/3] Factoring out Spark realigner tests. --- .../avocado/realigner/RealignerSuite.scala | 134 +-------------- .../realigner/SparkRealignerSuite.scala | 153 ++++++++++++++++++ 2 files changed, 160 insertions(+), 127 deletions(-) create mode 100644 avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala index c0875463..4c53d2fc 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala @@ -17,15 +17,8 @@ */ package org.bdgenomics.avocado.realigner -import org.bdgenomics.adam.models.{ - SequenceDictionary, - SequenceRecord, - RecordGroup, - RecordGroupDictionary -} import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD } -import org.bdgenomics.avocado.AvocadoFunSuite +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD import org.bdgenomics.avocado.models.{ Clipped, Deletion, @@ -35,7 +28,12 @@ import org.bdgenomics.avocado.models.{ } import org.bdgenomics.formats.avro.AlignmentRecord -class RealignerSuite extends AvocadoFunSuite { +class RealignerSuite extends SparkRealignerSuite { + + def realign(rdd: AlignmentRecordRDD, + kmerLength: Int): AlignmentRecordRDD = { + Realigner.realign(rdd, kmerLength) + } test("realignment candidate code needs at least one block") { intercept[AssertionError] { @@ -190,124 +188,6 @@ class RealignerSuite extends AvocadoFunSuite { } } - def makeAndRealignRdd(reads: Seq[AlignmentRecord], - kmerLength: Int): Array[AlignmentRecord] = { - val gRdd = AlignedReadRDD(sc.parallelize(reads), - SequenceDictionary(SequenceRecord("ctg", 50L)), - RecordGroupDictionary(Seq(RecordGroup("rg", "rg")))) - - // realign the genomic rdd - val realignedRdd = Realigner.realign(gRdd, kmerLength) - - // collect the reads - realignedRdd.rdd.collect() - } - - sparkTest("realign a set of reads around an insert") { - // insertion sequence: - // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA - // ref: AATGAGACTTACATCATTAA__CCGTGTGGACACA - val sequence = "AATGAGACTTACATCATTAAAACCGTGTGGACACA" - val insertStart = 20 - val readLength = insertStart + 6 + 2 - - // generate 7 reads with a 6bp flank - val reads = (0 until 7).map(rId => { - val basesBeforeInsert = insertStart - rId - val basesAfterInsert = 6 + rId - - AlignmentRecord.newBuilder() - .setReadName(rId.toString) - .setContigName("ctg") - .setRecordGroupName("rg") - .setReadMapped(true) - .setSequence(sequence.drop(rId).take(readLength)) - .setStart(rId.toLong) - .setEnd((rId + readLength - 2 + 1).toLong) - .setCigar("%dM2I%dM".format(basesBeforeInsert, basesAfterInsert)) - .setMismatchingPositions((readLength - 2).toString) - .build() - }) - - // make into a genomic rdd - val newReads = makeAndRealignRdd(reads, 6) - - assert(newReads.size === 7) - newReads.foreach(r => { - val rId = r.getReadName.toInt - - // these values are different from above because original alignments were - // not left justified - val basesBeforeInsert = insertStart - rId - 2 - val basesAfterInsert = 8 + rId - - assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert)) - assert(r.getMismatchingPositions === (readLength - 2).toString) - }) - } - - sparkTest("realign a set of reads around a deletion") { - // deletion sequence: - // del: AGGTCTGAATGAGACTTA__TCATTAACCGTGTGGACACA - // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA - val sequence = "AGGTCTGAATGAGACTTATCATTAACCGTGTGGACACA" - val deleteStart = 18 - val readLength = deleteStart + 8 - - // generate 10 reads with a 8bp flank - val reads = (0 until 10).map(rId => { - val basesBeforeDelete = deleteStart - rId - val basesAfterDelete = 8 + rId - - AlignmentRecord.newBuilder() - .setReadName(rId.toString) - .setContigName("ctg") - .setRecordGroupName("rg") - .setReadMapped(true) - .setSequence(sequence.drop(rId).take(readLength)) - .setStart(rId.toLong) - .setEnd((rId + readLength + 2 + 1).toLong) - .setCigar("%dM2D%dM".format(basesBeforeDelete, basesAfterDelete)) - .setMismatchingPositions("%d^CA%d".format(basesBeforeDelete, basesAfterDelete)) - .build() - }) - - // make into a genomic rdd - val newReads = makeAndRealignRdd(reads, 8) - - assert(newReads.size === 10) - newReads.foreach(r => { - val rId = r.getReadName.toInt - - // these values are different from above because original alignments were - // not left justified - val basesBeforeDelete = deleteStart - rId - 1 - val basesAfterDelete = 9 + rId - - assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete)) - assert(r.getMismatchingPositions === "%d^AC%d".format(basesBeforeDelete, basesAfterDelete)) - }) - } - - sparkTest("realigning a read with a repeat will return the original read") { - val read = AlignmentRecord.newBuilder() - .setReadName("A_READ") - .setReadMapped(true) - .setStart(10L) - .setEnd(17L) - .setSequence("TCAAAAAAGG") - .setCigar("3M4I3M") - .setMismatchingPositions("6") - .build() - - // make into a genomic rdd - val newReads = makeAndRealignRdd(Seq(read), 3) - - // should have one read - assert(newReads.size === 1) - assert(newReads.head === read) - } - sparkTest("one sample read should fail due to a repeat, all others should realign") { val readFile = resourceUrl("NA12878_reads.sam").toString val reads = sc.loadAlignments(readFile) diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala new file mode 100644 index 00000000..d035a413 --- /dev/null +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala @@ -0,0 +1,153 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.avocado.realigner + +import org.bdgenomics.adam.models.{ + SequenceDictionary, + SequenceRecord, + RecordGroup, + RecordGroupDictionary +} +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD } +import org.bdgenomics.avocado.AvocadoFunSuite +import org.bdgenomics.formats.avro.AlignmentRecord + +trait SparkRealignerSuite extends AvocadoFunSuite { + + def realign(rdd: AlignmentRecordRDD, + kmerLength: Int): AlignmentRecordRDD + + def makeAndRealignRdd(reads: Seq[AlignmentRecord], + kmerLength: Int): Array[AlignmentRecord] = { + val gRdd = AlignedReadRDD(sc.parallelize(reads), + SequenceDictionary(SequenceRecord("ctg", 50L)), + RecordGroupDictionary(Seq(RecordGroup("rg", "rg")))) + + // realign the genomic rdd + val realignedRdd = realign(gRdd, kmerLength) + + // collect the reads + realignedRdd.rdd.collect() + } + + sparkTest("realign a set of reads around an insert") { + // insertion sequence: + // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA + // ref: AATGAGACTTACATCATTAA__CCGTGTGGACACA + val sequence = "AATGAGACTTACATCATTAAAACCGTGTGGACACA" + val insertStart = 20 + val readLength = insertStart + 6 + 2 + + // generate 7 reads with a 6bp flank + val reads = (0 until 7).map(rId => { + val basesBeforeInsert = insertStart - rId + val basesAfterInsert = 6 + rId + + AlignmentRecord.newBuilder() + .setReadName(rId.toString) + .setContigName("ctg") + .setRecordGroupName("rg") + .setReadMapped(true) + .setSequence(sequence.drop(rId).take(readLength)) + .setStart(rId.toLong) + .setEnd((rId + readLength - 2 + 1).toLong) + .setCigar("%dM2I%dM".format(basesBeforeInsert, basesAfterInsert)) + .setMismatchingPositions((readLength - 2).toString) + .build() + }) + + // make into a genomic rdd + val newReads = makeAndRealignRdd(reads, 6) + + assert(newReads.size === 7) + newReads.foreach(r => { + val rId = r.getReadName.toInt + + // these values are different from above because original alignments were + // not left justified + val basesBeforeInsert = insertStart - rId - 2 + val basesAfterInsert = 8 + rId + + assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert)) + assert(r.getMismatchingPositions === (readLength - 2).toString) + }) + } + + sparkTest("realign a set of reads around a deletion") { + // deletion sequence: + // del: AGGTCTGAATGAGACTTA__TCATTAACCGTGTGGACACA + // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA + val sequence = "AGGTCTGAATGAGACTTATCATTAACCGTGTGGACACA" + val deleteStart = 18 + val readLength = deleteStart + 8 + + // generate 10 reads with a 8bp flank + val reads = (0 until 10).map(rId => { + val basesBeforeDelete = deleteStart - rId + val basesAfterDelete = 8 + rId + + AlignmentRecord.newBuilder() + .setReadName(rId.toString) + .setContigName("ctg") + .setRecordGroupName("rg") + .setReadMapped(true) + .setSequence(sequence.drop(rId).take(readLength)) + .setStart(rId.toLong) + .setEnd((rId + readLength + 2 + 1).toLong) + .setCigar("%dM2D%dM".format(basesBeforeDelete, basesAfterDelete)) + .setMismatchingPositions("%d^CA%d".format(basesBeforeDelete, basesAfterDelete)) + .build() + }) + + // make into a genomic rdd + val newReads = makeAndRealignRdd(reads, 8) + + assert(newReads.size === 10) + newReads.foreach(r => { + val rId = r.getReadName.toInt + + // these values are different from above because original alignments were + // not left justified + val basesBeforeDelete = deleteStart - rId - 1 + val basesAfterDelete = 9 + rId + + assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete)) + assert(r.getMismatchingPositions === "%d^AC%d".format(basesBeforeDelete, basesAfterDelete)) + }) + } + + sparkTest("realigning a read with a repeat will return the original read") { + val read = AlignmentRecord.newBuilder() + .setReadName("A_READ") + .setReadMapped(true) + .setStart(10L) + .setEnd(17L) + .setSequence("TCAAAAAAGG") + .setCigar("3M4I3M") + .setMismatchingPositions("6") + .build() + + // make into a genomic rdd + val newReads = makeAndRealignRdd(Seq(read), 3) + + // should have one read + assert(newReads.size === 1) + assert(newReads.head === read) + } +} From e96ddeaa5184ca18d5370c196dc65e7e0d5c9eb5 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Sun, 22 Jan 2017 23:18:01 -0800 Subject: [PATCH 2/3] [AVOCADO-200] Feed assembled variants to INDEL realigner. Resolves #200. Depends on https://github.com/bigdatagenomics/adam/pull/1363. * Adds a ConsensusRealigner that uses the main Realigner to realign reads, which are then fed into the variant discovery engine with loose filters. Any INDEL variants that are discovered are then fed in to ADAM's INDEL realignment engine as known INDELs. * Uses this realignment mode in the BiallelicGenotyperSuite to clean up a previously failing test. * Adds this to the Reassemble CLI as a `-use_consensus_realigner` switch. --- .../bdgenomics/avocado/cli/Reassemble.scala | 13 ++- .../realigner/ConsensusRealigner.scala | 64 ++++++++++++++ .../avocado/util/PrefilterReads.scala | 4 +- .../genotyping/BiallelicGenotyperSuite.scala | 7 +- .../genotyping/DiscoverVariantsSuite.scala | 4 +- .../realigner/ConsensusRealignerSuite.scala | 85 +++++++++++++++++++ .../avocado/realigner/RealignerSuite.scala | 2 + .../realigner/SparkRealignerSuite.scala | 71 +++++++++++----- .../avocado/util/PrefilterReadsSuite.scala | 4 +- pom.xml | 2 +- 10 files changed, 223 insertions(+), 33 deletions(-) create mode 100644 avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala create mode 100644 avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala diff --git a/avocado-cli/src/main/scala/org/bdgenomics/avocado/cli/Reassemble.scala b/avocado-cli/src/main/scala/org/bdgenomics/avocado/cli/Reassemble.scala index 534cd5a1..23eafb2d 100644 --- a/avocado-cli/src/main/scala/org/bdgenomics/avocado/cli/Reassemble.scala +++ b/avocado-cli/src/main/scala/org/bdgenomics/avocado/cli/Reassemble.scala @@ -20,8 +20,7 @@ package org.bdgenomics.avocado.cli import org.apache.spark.SparkContext import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs -import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD, MDTagging } -import org.bdgenomics.avocado.realigner.Realigner +import org.bdgenomics.avocado.realigner.{ ConsensusRealigner, Realigner } import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.{ Argument, Option => Args4jOption } @@ -58,6 +57,10 @@ class ReassembleArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { name = "-defer_merging", usage = "Defers merging single file output") var deferMerging: Boolean = false + @Args4jOption(required = false, + name = "-use_consensus_realigner", + usage = "If provided, uses consensus-mode realigner.") + var useConsensusRealigner: Boolean = false // required by ADAMSaveAnyArgs var sortFastqOutput: Boolean = false @@ -79,7 +82,11 @@ class Reassemble( val reads = sc.loadAlignments(args.inputPath) // realign the reads - val reassembledReads = Realigner.realign(reads, args.kmerLength) + val reassembledReads = if (args.useConsensusRealigner) { + ConsensusRealigner.realign(reads, args.kmerLength) + } else { + Realigner.realign(reads, args.kmerLength) + } // save the reads reassembledReads.save(args) diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala new file mode 100644 index 00000000..0ba9d487 --- /dev/null +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala @@ -0,0 +1,64 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.avocado.realigner + +import org.bdgenomics.adam.algorithms.consensus.ConsensusGenerator +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.avocado.genotyping.DiscoverVariants +import org.bdgenomics.formats.avro.Variant + +object ConsensusRealigner { + + /** + * Realigns a set of reads against the reference genome. + * + * @param reads Reads to realign. + * @param kmerLength The length k of the k-mers. + * @return Returns the realigned reads. + */ + def realign(reads: AlignmentRecordRDD, + kmerLength: Int): AlignmentRecordRDD = { + + // use the realigner to make a first pass over the reads + val realignedReads = Realigner.realign(reads, kmerLength) + + // from here, we'll discover any potential variants + val variants = filterIndels(DiscoverVariants(realignedReads)) + + // we'll pass these discovered variants to ADAM's indel realigner + reads.realignIndels(ConsensusGenerator.fromKnownIndels(variants)) + } + + /** + * @param v The variant to filter. + * @return Returns false if the variant is a SNV/MNV. + */ + private[realigner] def discardSnvs(v: Variant): Boolean = { + v.getReferenceAllele.length != v.getAlternateAllele.length + } + + /** + * @param rdd The RDD of variants to filter. + * @return Returns a new RDD of variants where the SNVs & MNVs have been + * removed and only INDEL variants remain. + */ + private[realigner] def filterIndels(rdd: VariantRDD): VariantRDD = { + rdd.transform(r => r.filter(discardSnvs)) + } +} diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/util/PrefilterReads.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/util/PrefilterReads.scala index 017d32ac..e578f135 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/util/PrefilterReads.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/util/PrefilterReads.scala @@ -18,7 +18,7 @@ package org.bdgenomics.avocado.util import org.bdgenomics.adam.models.SequenceDictionary -import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD } +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD import org.bdgenomics.formats.avro.AlignmentRecord private[avocado] trait PrefilterReadsArgs extends Serializable { @@ -85,7 +85,7 @@ private[avocado] object PrefilterReads extends Serializable { val readRdd = rdd.rdd.filter(readFn) // construct a new aligned read rdd - AlignedReadRDD(readRdd, sequences, rdd.recordGroups) + AlignmentRecordRDD(readRdd, sequences, rdd.recordGroups) } /** diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala index 5a6503c6..cd63a9e6 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala @@ -20,6 +20,7 @@ package org.bdgenomics.avocado.genotyping import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.avocado.AvocadoFunSuite import org.bdgenomics.avocado.models.Observation +import org.bdgenomics.avocado.realigner.ConsensusRealigner import org.bdgenomics.avocado.util.{ HardFilterGenotypes, TestHardFilterGenotypesArgs @@ -437,14 +438,16 @@ class BiallelicGenotyperSuite extends AvocadoFunSuite { }) } - ignore("call hom alt C->CCCCT insertion at 1/866511") { + sparkTest("call hom alt C->CCCCT insertion at 1/866511") { val readPath = resourceUrl("NA12878.chr1.866511.sam") val reads = sc.loadAlignments(readPath.toString) .transform(rdd => { rdd.filter(_.getMapq > 0) }) - val gts = BiallelicGenotyper.discoverAndCall(reads, + val realignedReads = ConsensusRealigner.realign(reads, 20) + + val gts = BiallelicGenotyper.discoverAndCall(realignedReads, 2, optPhredThreshold = Some(30)).transform(rdd => { rdd.filter(gt => gt.getAlleles.forall(_ == GenotypeAllele.ALT)) diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/DiscoverVariantsSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/DiscoverVariantsSuite.scala index e28ebecc..82dbce6c 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/DiscoverVariantsSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/DiscoverVariantsSuite.scala @@ -22,7 +22,7 @@ import org.bdgenomics.adam.models.{ SequenceDictionary, SequenceRecord } -import org.bdgenomics.adam.rdd.read.AlignedReadRDD +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD import org.bdgenomics.avocado.AvocadoFunSuite import org.bdgenomics.formats.avro.{ AlignmentRecord, Variant } @@ -233,7 +233,7 @@ class DiscoverVariantsSuite extends AvocadoFunSuite { snpReadMCigar, snpReadEqCigar, insertRead, deleteRead)) - val readRdd = AlignedReadRDD(rdd, + val readRdd = AlignmentRecordRDD(rdd, SequenceDictionary( SequenceRecord("1", 50L), SequenceRecord("2", 40L), diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala new file mode 100644 index 00000000..b986577b --- /dev/null +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala @@ -0,0 +1,85 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.avocado.realigner + +import org.bdgenomics.adam.models.SequenceDictionary +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.formats.avro.Variant + +class ConsensusRealignerSuite extends SparkRealignerSuite { + + val allowLegacyCigars = true + + def realign(rdd: AlignmentRecordRDD, + kmerLength: Int): AlignmentRecordRDD = { + ConsensusRealigner.realign(rdd, kmerLength) + } + + val snv = Variant.newBuilder + .setStart(0L) + .setReferenceAllele("A") + .setAlternateAllele("G") + .build + val mnv = Variant.newBuilder + .setStart(1L) + .setReferenceAllele("AC") + .setAlternateAllele("GT") + .build + val ins = Variant.newBuilder + .setStart(2L) + .setReferenceAllele("A") + .setAlternateAllele("ACC") + .build + val del = Variant.newBuilder + .setStart(3L) + .setReferenceAllele("GCT") + .setAlternateAllele("G") + .build + + test("filter out a snv") { + assert(!ConsensusRealigner.discardSnvs(snv)) + } + + test("filter out a mnv") { + assert(!ConsensusRealigner.discardSnvs(mnv)) + } + + test("keep an insert") { + assert(ConsensusRealigner.discardSnvs(ins)) + } + + test("keep a deletion") { + assert(ConsensusRealigner.discardSnvs(del)) + } + + sparkTest("filter snv/mnv variants out") { + val vRdd = VariantRDD(sc.parallelize(Seq(snv, mnv, ins, del)), + SequenceDictionary.empty) + val filteredVariants = ConsensusRealigner.filterIndels(vRdd) + .rdd + .collect + + val variantIds = filteredVariants.map(_.getStart) + .toSet + + assert(variantIds.size === 2) + assert(variantIds(2L)) + assert(variantIds(3L)) + } +} diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala index 4c53d2fc..289dbf6d 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala @@ -30,6 +30,8 @@ import org.bdgenomics.formats.avro.AlignmentRecord class RealignerSuite extends SparkRealignerSuite { + val allowLegacyCigars = false + def realign(rdd: AlignmentRecordRDD, kmerLength: Int): AlignmentRecordRDD = { Realigner.realign(rdd, kmerLength) diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala index d035a413..171d91cb 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala @@ -24,18 +24,20 @@ import org.bdgenomics.adam.models.{ RecordGroupDictionary } import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD } +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD import org.bdgenomics.avocado.AvocadoFunSuite import org.bdgenomics.formats.avro.AlignmentRecord trait SparkRealignerSuite extends AvocadoFunSuite { + val allowLegacyCigars: Boolean + def realign(rdd: AlignmentRecordRDD, kmerLength: Int): AlignmentRecordRDD def makeAndRealignRdd(reads: Seq[AlignmentRecord], kmerLength: Int): Array[AlignmentRecord] = { - val gRdd = AlignedReadRDD(sc.parallelize(reads), + val gRdd = AlignmentRecordRDD(sc.parallelize(reads), SequenceDictionary(SequenceRecord("ctg", 50L)), RecordGroupDictionary(Seq(RecordGroup("rg", "rg")))) @@ -47,11 +49,16 @@ trait SparkRealignerSuite extends AvocadoFunSuite { } sparkTest("realign a set of reads around an insert") { - // insertion sequence: + // true insertion sequence: + // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA + // ref: AATGAGACTTACATCATT__AACCGTGTGGACACA + // + // test alignment: + // X: | // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA - // ref: AATGAGACTTACATCATTAA__CCGTGTGGACACA + // ref: AATGAGACTTACATCATTAAC__CGTGTGGACACA val sequence = "AATGAGACTTACATCATTAAAACCGTGTGGACACA" - val insertStart = 20 + val insertStart = 21 val readLength = insertStart + 6 + 2 // generate 7 reads with a 6bp flank @@ -65,10 +72,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite { .setRecordGroupName("rg") .setReadMapped(true) .setSequence(sequence.drop(rId).take(readLength)) + .setQual("*" * readLength) .setStart(rId.toLong) - .setEnd((rId + readLength - 2 + 1).toLong) + .setEnd((rId + insertStart + 6).toLong) .setCigar("%dM2I%dM".format(basesBeforeInsert, basesAfterInsert)) - .setMismatchingPositions((readLength - 2).toString) + .setMismatchingPositions("%dC%d".format(basesBeforeInsert - 1, + basesAfterInsert)) + .setMapq(50) .build() }) @@ -81,25 +91,34 @@ trait SparkRealignerSuite extends AvocadoFunSuite { // these values are different from above because original alignments were // not left justified - val basesBeforeInsert = insertStart - rId - 2 - val basesAfterInsert = 8 + rId - - assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert)) + val basesBeforeInsert = insertStart - rId - 3 + val basesAfterInsert = 9 + rId + + if (allowLegacyCigars) { + assert(r.getCigar === "%dM2I%dM".format(basesBeforeInsert, basesAfterInsert)) + } else { + assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert)) + } assert(r.getMismatchingPositions === (readLength - 2).toString) }) } sparkTest("realign a set of reads around a deletion") { - // deletion sequence: - // del: AGGTCTGAATGAGACTTA__TCATTAACCGTGTGGACACA + // true deletion sequence: + // del: AGGTCTGAATGAGACTT__ATCATTAACCGTGTGGACACA + // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA + // + // test alignment: + // X: | + // del: AGGTCTGAATGAGACT__TATCATTAACCGTGTGGACACA // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA val sequence = "AGGTCTGAATGAGACTTATCATTAACCGTGTGGACACA" - val deleteStart = 18 - val readLength = deleteStart + 8 + val deleteStart = 16 + val readLength = deleteStart + 7 // generate 10 reads with a 8bp flank val reads = (0 until 10).map(rId => { - val basesBeforeDelete = deleteStart - rId + val basesBeforeDelete = deleteStart - rId - 1 val basesAfterDelete = 8 + rId AlignmentRecord.newBuilder() @@ -108,10 +127,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite { .setRecordGroupName("rg") .setReadMapped(true) .setSequence(sequence.drop(rId).take(readLength)) + .setQual("*" * readLength) .setStart(rId.toLong) - .setEnd((rId + readLength + 2 + 1).toLong) + .setEnd((rId + readLength + 2).toLong) .setCigar("%dM2D%dM".format(basesBeforeDelete, basesAfterDelete)) - .setMismatchingPositions("%d^CA%d".format(basesBeforeDelete, basesAfterDelete)) + .setMismatchingPositions("%d^TA0C%d".format(basesBeforeDelete, + basesAfterDelete - 1)) + .setMapq(50) .build() }) @@ -124,23 +146,30 @@ trait SparkRealignerSuite extends AvocadoFunSuite { // these values are different from above because original alignments were // not left justified - val basesBeforeDelete = deleteStart - rId - 1 - val basesAfterDelete = 9 + rId + val basesBeforeDelete = deleteStart - rId + val basesAfterDelete = 7 + rId - assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete)) + if (allowLegacyCigars) { + assert(r.getCigar === "%dM2D%dM".format(basesBeforeDelete, basesAfterDelete)) + } else { + assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete)) + } assert(r.getMismatchingPositions === "%d^AC%d".format(basesBeforeDelete, basesAfterDelete)) }) } sparkTest("realigning a read with a repeat will return the original read") { val read = AlignmentRecord.newBuilder() + .setContigName("ctg") .setReadName("A_READ") .setReadMapped(true) .setStart(10L) .setEnd(17L) .setSequence("TCAAAAAAGG") + .setQual("**********") .setCigar("3M4I3M") .setMismatchingPositions("6") + .setMapq(50) .build() // make into a genomic rdd diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/util/PrefilterReadsSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/util/PrefilterReadsSuite.scala index 69bb006d..700a6a9b 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/util/PrefilterReadsSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/util/PrefilterReadsSuite.scala @@ -24,7 +24,7 @@ import org.bdgenomics.adam.models.{ SequenceRecord } import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.read.AlignedReadRDD +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD import org.bdgenomics.formats.avro.AlignmentRecord case class TestPrefilterReadsArgs(var isNotGrc: Boolean = false, @@ -239,7 +239,7 @@ class PrefilterReadsSuite extends AvocadoFunSuite { def testRdd(args: PrefilterReadsArgs, numReads: Int, numContigs: Int) { - val readRdd = AlignedReadRDD(sc.parallelize(reads), sequences, RecordGroupDictionary.empty) + val readRdd = AlignmentRecordRDD(sc.parallelize(reads), sequences, RecordGroupDictionary.empty) val filteredRdd = PrefilterReads(readRdd, args) assert(filteredRdd.rdd.count === numReads) diff --git a/pom.xml b/pom.xml index f7af4f26..ba734ac5 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ avocado: A Variant Caller, Distributed - 0.21.0 + 0.21.1-SNAPSHOT 1.8.0 1.8 2.10.4 From 45c974d009ef80faa3487d4870d61207fde82b22 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Wed, 25 Jan 2017 00:53:36 -0800 Subject: [PATCH 3/3] Clean up some INDEL bits. --- .../genotyping/BiallelicGenotyper.scala | 18 +++++++++++++++++- .../avocado/genotyping/DiscoverVariants.scala | 6 +++++- .../avocado/genotyping/Observer.scala | 10 +++++++--- .../avocado/models/Observation.scala | 18 ++++++++++++++++++ 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyper.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyper.scala index 625857eb..6aae8d7d 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyper.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyper.scala @@ -222,10 +222,26 @@ private[avocado] object BiallelicGenotyper extends Serializable with Logging { } else if (isInsertion(variant)) { val insAllele = variant.getAlternateAllele.tail val insObserved = observed.filter(_._2 == insAllele) + println("for %s observed %s in %s".format(insAllele, observed.mkString(","), read)) if (observed.size == 2 && insObserved.size == 1) { Some((variant, insObserved.head._3.duplicate(Some(false)))) - } else if (observed.forall(_._3.isRef)) { + } else if (!observed.forall(_._3.isRef)) { + val nonRef = observed.filter(!_._3.isRef) + if (nonRef.size == 1) { + val (_, allele, nonRefObs) = nonRef.head + if (allele.length == insAllele.length) { + val matchingBases = allele.zip(insAllele).count(p => p._1 == p._2) + Some((variant, nonRefObs.scale(matchingBases, + insAllele.length, + Some(false)))) + } else { + Some((variant, observed.head._3.nullOut)) + } + } else { + Some((variant, observed.head._3.nullOut)) + } + } else if (read.getEnd != variant.getEnd) { Some((variant, observed.head._3.invert)) } else { Some((variant, observed.head._3.nullOut)) diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/DiscoverVariants.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/DiscoverVariants.scala index 11a46041..9b3acc4a 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/DiscoverVariants.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/DiscoverVariants.scala @@ -201,7 +201,11 @@ object DiscoverVariants extends Serializable with Logging { kv } case Insertion(length) => { - val insQuals = qual.substring(idx - 1, idx + length).map(_.toInt - 33).sum / length + val insQuals = if (length > 0) { + qual.substring(idx - 1, idx + length).map(_.toInt - 33).sum / length + } else { + 0 + } val newVar = if (insQuals >= phredThreshold) { Variant.newBuilder .setContigName(contigName) diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/Observer.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/Observer.scala index 144e7839..72e2f92d 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/Observer.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/Observer.scala @@ -131,9 +131,13 @@ private[genotyping] object Observer extends Serializable { // get bases and quals val bases = readSequence.substring(oldReadIdx, readIdx) - val qual = readQualities.substring(oldReadIdx, readIdx) - .map(_.toInt - 33) - .sum / length + val qual = if (length > 0) { + readQualities.substring(oldReadIdx, readIdx) + .map(_.toInt - 33) + .sum / length + } else { + 0 + } // the key is the (site, allele, sampleId) // insertions associate to the site to their left, hence the -1 diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/models/Observation.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/models/Observation.scala index 3cfcd653..eb1be137 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/models/Observation.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/models/Observation.scala @@ -91,6 +91,24 @@ case class Observation(alleleForwardStrand: Int, isRef = setRef.getOrElse(isRef)) } + /** + * @return Makes a copy where underlying arrays are not shared. + */ + def scale(num: Int, + denum: Int, + setRef: Option[Boolean] = None): Observation = { + val scaleBy = num.toDouble / denum.toDouble + Observation(alleleForwardStrand, + otherForwardStrand, + squareMapQ, + alleleLogLikelihoods.map(v => v * scaleBy), + otherLogLikelihoods.map(v => v * scaleBy), + alleleCoverage, + otherCoverage, + totalCoverage = totalCoverage, + isRef = setRef.getOrElse(isRef)) + } + /** * @return Returns this observation, but with allele/other swapped. *