diff --git a/avocado-cli/src/main/scala/org/bdgenomics/avocado/cli/Reassemble.scala b/avocado-cli/src/main/scala/org/bdgenomics/avocado/cli/Reassemble.scala
index 534cd5a1..23eafb2d 100644
--- a/avocado-cli/src/main/scala/org/bdgenomics/avocado/cli/Reassemble.scala
+++ b/avocado-cli/src/main/scala/org/bdgenomics/avocado/cli/Reassemble.scala
@@ -20,8 +20,7 @@ package org.bdgenomics.avocado.cli
import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
-import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD, MDTagging }
-import org.bdgenomics.avocado.realigner.Realigner
+import org.bdgenomics.avocado.realigner.{ ConsensusRealigner, Realigner }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
@@ -58,6 +57,10 @@ class ReassembleArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
name = "-defer_merging",
usage = "Defers merging single file output")
var deferMerging: Boolean = false
+ @Args4jOption(required = false,
+ name = "-use_consensus_realigner",
+ usage = "If provided, uses consensus-mode realigner.")
+ var useConsensusRealigner: Boolean = false
// required by ADAMSaveAnyArgs
var sortFastqOutput: Boolean = false
@@ -79,7 +82,11 @@ class Reassemble(
val reads = sc.loadAlignments(args.inputPath)
// realign the reads
- val reassembledReads = Realigner.realign(reads, args.kmerLength)
+ val reassembledReads = if (args.useConsensusRealigner) {
+ ConsensusRealigner.realign(reads, args.kmerLength)
+ } else {
+ Realigner.realign(reads, args.kmerLength)
+ }
// save the reads
reassembledReads.save(args)
diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala
new file mode 100644
index 00000000..0ba9d487
--- /dev/null
+++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/realigner/ConsensusRealigner.scala
@@ -0,0 +1,64 @@
+/**
+ * Licensed to Big Data Genomics (BDG) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The BDG licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.bdgenomics.avocado.realigner
+
+import org.bdgenomics.adam.algorithms.consensus.ConsensusGenerator
+import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
+import org.bdgenomics.adam.rdd.variant.VariantRDD
+import org.bdgenomics.avocado.genotyping.DiscoverVariants
+import org.bdgenomics.formats.avro.Variant
+
+object ConsensusRealigner {
+
+ /**
+ * Realigns a set of reads against the reference genome.
+ *
+ * @param reads Reads to realign.
+ * @param kmerLength The length k of the k-mers.
+ * @return Returns the realigned reads.
+ */
+ def realign(reads: AlignmentRecordRDD,
+ kmerLength: Int): AlignmentRecordRDD = {
+
+ // use the realigner to make a first pass over the reads
+ val realignedReads = Realigner.realign(reads, kmerLength)
+
+ // from here, we'll discover any potential variants
+ val variants = filterIndels(DiscoverVariants(realignedReads))
+
+ // we'll pass these discovered variants to ADAM's indel realigner
+ reads.realignIndels(ConsensusGenerator.fromKnownIndels(variants))
+ }
+
+ /**
+ * @param v The variant to filter.
+ * @return Returns false if the variant is a SNV/MNV.
+ */
+ private[realigner] def discardSnvs(v: Variant): Boolean = {
+ v.getReferenceAllele.length != v.getAlternateAllele.length
+ }
+
+ /**
+ * @param rdd The RDD of variants to filter.
+ * @return Returns a new RDD of variants where the SNVs & MNVs have been
+ * removed and only INDEL variants remain.
+ */
+ private[realigner] def filterIndels(rdd: VariantRDD): VariantRDD = {
+ rdd.transform(r => r.filter(discardSnvs))
+ }
+}
diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/util/PrefilterReads.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/util/PrefilterReads.scala
index 017d32ac..e578f135 100644
--- a/avocado-core/src/main/scala/org/bdgenomics/avocado/util/PrefilterReads.scala
+++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/util/PrefilterReads.scala
@@ -18,7 +18,7 @@
package org.bdgenomics.avocado.util
import org.bdgenomics.adam.models.SequenceDictionary
-import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD }
+import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.formats.avro.AlignmentRecord
private[avocado] trait PrefilterReadsArgs extends Serializable {
@@ -85,7 +85,7 @@ private[avocado] object PrefilterReads extends Serializable {
val readRdd = rdd.rdd.filter(readFn)
// construct a new aligned read rdd
- AlignedReadRDD(readRdd, sequences, rdd.recordGroups)
+ AlignmentRecordRDD(readRdd, sequences, rdd.recordGroups)
}
/**
diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala
index 5a6503c6..cd63a9e6 100644
--- a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala
+++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/BiallelicGenotyperSuite.scala
@@ -20,6 +20,7 @@ package org.bdgenomics.avocado.genotyping
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.avocado.AvocadoFunSuite
import org.bdgenomics.avocado.models.Observation
+import org.bdgenomics.avocado.realigner.ConsensusRealigner
import org.bdgenomics.avocado.util.{
HardFilterGenotypes,
TestHardFilterGenotypesArgs
@@ -437,14 +438,16 @@ class BiallelicGenotyperSuite extends AvocadoFunSuite {
})
}
- ignore("call hom alt C->CCCCT insertion at 1/866511") {
+ sparkTest("call hom alt C->CCCCT insertion at 1/866511") {
val readPath = resourceUrl("NA12878.chr1.866511.sam")
val reads = sc.loadAlignments(readPath.toString)
.transform(rdd => {
rdd.filter(_.getMapq > 0)
})
- val gts = BiallelicGenotyper.discoverAndCall(reads,
+ val realignedReads = ConsensusRealigner.realign(reads, 20)
+
+ val gts = BiallelicGenotyper.discoverAndCall(realignedReads,
2,
optPhredThreshold = Some(30)).transform(rdd => {
rdd.filter(gt => gt.getAlleles.forall(_ == GenotypeAllele.ALT))
diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/DiscoverVariantsSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/DiscoverVariantsSuite.scala
index e28ebecc..82dbce6c 100644
--- a/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/DiscoverVariantsSuite.scala
+++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/genotyping/DiscoverVariantsSuite.scala
@@ -22,7 +22,7 @@ import org.bdgenomics.adam.models.{
SequenceDictionary,
SequenceRecord
}
-import org.bdgenomics.adam.rdd.read.AlignedReadRDD
+import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.avocado.AvocadoFunSuite
import org.bdgenomics.formats.avro.{ AlignmentRecord, Variant }
@@ -233,7 +233,7 @@ class DiscoverVariantsSuite extends AvocadoFunSuite {
snpReadMCigar, snpReadEqCigar,
insertRead,
deleteRead))
- val readRdd = AlignedReadRDD(rdd,
+ val readRdd = AlignmentRecordRDD(rdd,
SequenceDictionary(
SequenceRecord("1", 50L),
SequenceRecord("2", 40L),
diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala
new file mode 100644
index 00000000..b986577b
--- /dev/null
+++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/ConsensusRealignerSuite.scala
@@ -0,0 +1,85 @@
+/**
+ * Licensed to Big Data Genomics (BDG) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The BDG licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.bdgenomics.avocado.realigner
+
+import org.bdgenomics.adam.models.SequenceDictionary
+import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
+import org.bdgenomics.adam.rdd.variant.VariantRDD
+import org.bdgenomics.formats.avro.Variant
+
+class ConsensusRealignerSuite extends SparkRealignerSuite {
+
+ val allowLegacyCigars = true
+
+ def realign(rdd: AlignmentRecordRDD,
+ kmerLength: Int): AlignmentRecordRDD = {
+ ConsensusRealigner.realign(rdd, kmerLength)
+ }
+
+ val snv = Variant.newBuilder
+ .setStart(0L)
+ .setReferenceAllele("A")
+ .setAlternateAllele("G")
+ .build
+ val mnv = Variant.newBuilder
+ .setStart(1L)
+ .setReferenceAllele("AC")
+ .setAlternateAllele("GT")
+ .build
+ val ins = Variant.newBuilder
+ .setStart(2L)
+ .setReferenceAllele("A")
+ .setAlternateAllele("ACC")
+ .build
+ val del = Variant.newBuilder
+ .setStart(3L)
+ .setReferenceAllele("GCT")
+ .setAlternateAllele("G")
+ .build
+
+ test("filter out a snv") {
+ assert(!ConsensusRealigner.discardSnvs(snv))
+ }
+
+ test("filter out a mnv") {
+ assert(!ConsensusRealigner.discardSnvs(mnv))
+ }
+
+ test("keep an insert") {
+ assert(ConsensusRealigner.discardSnvs(ins))
+ }
+
+ test("keep a deletion") {
+ assert(ConsensusRealigner.discardSnvs(del))
+ }
+
+ sparkTest("filter snv/mnv variants out") {
+ val vRdd = VariantRDD(sc.parallelize(Seq(snv, mnv, ins, del)),
+ SequenceDictionary.empty)
+ val filteredVariants = ConsensusRealigner.filterIndels(vRdd)
+ .rdd
+ .collect
+
+ val variantIds = filteredVariants.map(_.getStart)
+ .toSet
+
+ assert(variantIds.size === 2)
+ assert(variantIds(2L))
+ assert(variantIds(3L))
+ }
+}
diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala
index 4c53d2fc..289dbf6d 100644
--- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala
+++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/RealignerSuite.scala
@@ -30,6 +30,8 @@ import org.bdgenomics.formats.avro.AlignmentRecord
class RealignerSuite extends SparkRealignerSuite {
+ val allowLegacyCigars = false
+
def realign(rdd: AlignmentRecordRDD,
kmerLength: Int): AlignmentRecordRDD = {
Realigner.realign(rdd, kmerLength)
diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala
index d035a413..171d91cb 100644
--- a/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala
+++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/realigner/SparkRealignerSuite.scala
@@ -24,18 +24,20 @@ import org.bdgenomics.adam.models.{
RecordGroupDictionary
}
import org.bdgenomics.adam.rdd.ADAMContext._
-import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD }
+import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.avocado.AvocadoFunSuite
import org.bdgenomics.formats.avro.AlignmentRecord
trait SparkRealignerSuite extends AvocadoFunSuite {
+ val allowLegacyCigars: Boolean
+
def realign(rdd: AlignmentRecordRDD,
kmerLength: Int): AlignmentRecordRDD
def makeAndRealignRdd(reads: Seq[AlignmentRecord],
kmerLength: Int): Array[AlignmentRecord] = {
- val gRdd = AlignedReadRDD(sc.parallelize(reads),
+ val gRdd = AlignmentRecordRDD(sc.parallelize(reads),
SequenceDictionary(SequenceRecord("ctg", 50L)),
RecordGroupDictionary(Seq(RecordGroup("rg", "rg"))))
@@ -47,11 +49,16 @@ trait SparkRealignerSuite extends AvocadoFunSuite {
}
sparkTest("realign a set of reads around an insert") {
- // insertion sequence:
+ // true insertion sequence:
+ // ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA
+ // ref: AATGAGACTTACATCATT__AACCGTGTGGACACA
+ //
+ // test alignment:
+ // X: |
// ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA
- // ref: AATGAGACTTACATCATTAA__CCGTGTGGACACA
+ // ref: AATGAGACTTACATCATTAAC__CGTGTGGACACA
val sequence = "AATGAGACTTACATCATTAAAACCGTGTGGACACA"
- val insertStart = 20
+ val insertStart = 21
val readLength = insertStart + 6 + 2
// generate 7 reads with a 6bp flank
@@ -65,10 +72,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite {
.setRecordGroupName("rg")
.setReadMapped(true)
.setSequence(sequence.drop(rId).take(readLength))
+ .setQual("*" * readLength)
.setStart(rId.toLong)
- .setEnd((rId + readLength - 2 + 1).toLong)
+ .setEnd((rId + insertStart + 6).toLong)
.setCigar("%dM2I%dM".format(basesBeforeInsert, basesAfterInsert))
- .setMismatchingPositions((readLength - 2).toString)
+ .setMismatchingPositions("%dC%d".format(basesBeforeInsert - 1,
+ basesAfterInsert))
+ .setMapq(50)
.build()
})
@@ -81,25 +91,34 @@ trait SparkRealignerSuite extends AvocadoFunSuite {
// these values are different from above because original alignments were
// not left justified
- val basesBeforeInsert = insertStart - rId - 2
- val basesAfterInsert = 8 + rId
-
- assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert))
+ val basesBeforeInsert = insertStart - rId - 3
+ val basesAfterInsert = 9 + rId
+
+ if (allowLegacyCigars) {
+ assert(r.getCigar === "%dM2I%dM".format(basesBeforeInsert, basesAfterInsert))
+ } else {
+ assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert))
+ }
assert(r.getMismatchingPositions === (readLength - 2).toString)
})
}
sparkTest("realign a set of reads around a deletion") {
- // deletion sequence:
- // del: AGGTCTGAATGAGACTTA__TCATTAACCGTGTGGACACA
+ // true deletion sequence:
+ // del: AGGTCTGAATGAGACTT__ATCATTAACCGTGTGGACACA
+ // ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA
+ //
+ // test alignment:
+ // X: |
+ // del: AGGTCTGAATGAGACT__TATCATTAACCGTGTGGACACA
// ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA
val sequence = "AGGTCTGAATGAGACTTATCATTAACCGTGTGGACACA"
- val deleteStart = 18
- val readLength = deleteStart + 8
+ val deleteStart = 16
+ val readLength = deleteStart + 7
// generate 10 reads with a 8bp flank
val reads = (0 until 10).map(rId => {
- val basesBeforeDelete = deleteStart - rId
+ val basesBeforeDelete = deleteStart - rId - 1
val basesAfterDelete = 8 + rId
AlignmentRecord.newBuilder()
@@ -108,10 +127,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite {
.setRecordGroupName("rg")
.setReadMapped(true)
.setSequence(sequence.drop(rId).take(readLength))
+ .setQual("*" * readLength)
.setStart(rId.toLong)
- .setEnd((rId + readLength + 2 + 1).toLong)
+ .setEnd((rId + readLength + 2).toLong)
.setCigar("%dM2D%dM".format(basesBeforeDelete, basesAfterDelete))
- .setMismatchingPositions("%d^CA%d".format(basesBeforeDelete, basesAfterDelete))
+ .setMismatchingPositions("%d^TA0C%d".format(basesBeforeDelete,
+ basesAfterDelete - 1))
+ .setMapq(50)
.build()
})
@@ -124,23 +146,30 @@ trait SparkRealignerSuite extends AvocadoFunSuite {
// these values are different from above because original alignments were
// not left justified
- val basesBeforeDelete = deleteStart - rId - 1
- val basesAfterDelete = 9 + rId
+ val basesBeforeDelete = deleteStart - rId
+ val basesAfterDelete = 7 + rId
- assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete))
+ if (allowLegacyCigars) {
+ assert(r.getCigar === "%dM2D%dM".format(basesBeforeDelete, basesAfterDelete))
+ } else {
+ assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete))
+ }
assert(r.getMismatchingPositions === "%d^AC%d".format(basesBeforeDelete, basesAfterDelete))
})
}
sparkTest("realigning a read with a repeat will return the original read") {
val read = AlignmentRecord.newBuilder()
+ .setContigName("ctg")
.setReadName("A_READ")
.setReadMapped(true)
.setStart(10L)
.setEnd(17L)
.setSequence("TCAAAAAAGG")
+ .setQual("**********")
.setCigar("3M4I3M")
.setMismatchingPositions("6")
+ .setMapq(50)
.build()
// make into a genomic rdd
diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/util/PrefilterReadsSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/util/PrefilterReadsSuite.scala
index 69bb006d..700a6a9b 100644
--- a/avocado-core/src/test/scala/org/bdgenomics/avocado/util/PrefilterReadsSuite.scala
+++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/util/PrefilterReadsSuite.scala
@@ -24,7 +24,7 @@ import org.bdgenomics.adam.models.{
SequenceRecord
}
import org.bdgenomics.adam.rdd.ADAMContext._
-import org.bdgenomics.adam.rdd.read.AlignedReadRDD
+import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.formats.avro.AlignmentRecord
case class TestPrefilterReadsArgs(var isNotGrc: Boolean = false,
@@ -239,7 +239,7 @@ class PrefilterReadsSuite extends AvocadoFunSuite {
def testRdd(args: PrefilterReadsArgs, numReads: Int, numContigs: Int) {
- val readRdd = AlignedReadRDD(sc.parallelize(reads), sequences, RecordGroupDictionary.empty)
+ val readRdd = AlignmentRecordRDD(sc.parallelize(reads), sequences, RecordGroupDictionary.empty)
val filteredRdd = PrefilterReads(readRdd, args)
assert(filteredRdd.rdd.count === numReads)
diff --git a/pom.xml b/pom.xml
index f7af4f26..ba734ac5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
avocado: A Variant Caller, Distributed
- 0.21.0
+ 0.21.1-SNAPSHOT
1.8.0
1.8
2.10.4