diff --git a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala index 225849668..ed436709a 100644 --- a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala @@ -16,40 +16,24 @@ package com.twitter.summingbird.storm +import java.util.{Map => JMap} + import backtype.storm.generated.StormTopology -import com.twitter.algebird.{ MapAlgebra, Semigroup } -import com.twitter.storehaus.{ ReadableStore, JMapStore } -import com.twitter.storehaus.algebra.MergeableStore import com.twitter.summingbird._ -import com.twitter.summingbird.online._ +import com.twitter.summingbird.batch.Batcher import com.twitter.summingbird.online.option._ -import com.twitter.summingbird.storm.option._ -import com.twitter.summingbird.batch.{ BatchID, Batcher } import com.twitter.summingbird.storm.spout.TraversableSpout -import com.twitter.tormenta.spout.Spout -import com.twitter.util.Future -import java.util.{ Collections, HashMap, Map => JMap, UUID } -import java.util.concurrent.atomic.AtomicInteger -import org.scalatest.WordSpec import org.scalacheck._ -import org.scalacheck.Prop._ -import org.scalacheck.Properties +import org.scalatest.WordSpec import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ -import scala.collection.mutable.{ - ArrayBuffer, - HashMap => MutableHashMap, - Map => MutableMap, - SynchronizedBuffer, - SynchronizedMap -} +import scala.collection.mutable.{HashMap => MutableHashMap, Map => MutableMap} + /** * Tests for Summingbird's Storm planner. */ class TopologyTests extends WordSpec { - import MapAlgebra.sparseEquiv // This is dangerous, obviously. The Storm platform graphs tested // here use the UnitBatcher, so the actual time extraction isn't @@ -140,6 +124,23 @@ class TopologyTests extends WordSpec { assert(TDistMap(1).get_common.get_parallelism_hint == 50) } + "With same setting on multiple names we use the one for the node" in { + val fmNodeName = "flatMapper" + val smNodeName = "summer" + val p = Storm.source(TraversableSpout(sample[List[Int]])) + .flatMap(testFn).name(fmNodeName) + .sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName) + + val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)), + smNodeName -> Options().set(SummerParallelism(20))) + val storm = Storm.local(opts) + val stormTopo = storm.plan(p).topology + val bolts = stormTopo.get_bolts + + // Tail should use parallelism specified for the summer node + assert(bolts("Tail").get_common.get_parallelism_hint == 20) + } + "If the closes doesnt contain the option we keep going" in { val nodeName = "super dooper node" val otherNodeName = "super dooper node"