From 4a602fd89749a586882d5af0739b8685199602cb Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Wed, 29 Jun 2016 11:16:35 -0700 Subject: [PATCH 1/7] Test to illustrate https://github.com/twitter/summingbird/issues/671 --- .../summingbird/storm/TopologyTests.scala | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala index 225849668..ed436709a 100644 --- a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala @@ -16,40 +16,24 @@ package com.twitter.summingbird.storm +import java.util.{Map => JMap} + import backtype.storm.generated.StormTopology -import com.twitter.algebird.{ MapAlgebra, Semigroup } -import com.twitter.storehaus.{ ReadableStore, JMapStore } -import com.twitter.storehaus.algebra.MergeableStore import com.twitter.summingbird._ -import com.twitter.summingbird.online._ +import com.twitter.summingbird.batch.Batcher import com.twitter.summingbird.online.option._ -import com.twitter.summingbird.storm.option._ -import com.twitter.summingbird.batch.{ BatchID, Batcher } import com.twitter.summingbird.storm.spout.TraversableSpout -import com.twitter.tormenta.spout.Spout -import com.twitter.util.Future -import java.util.{ Collections, HashMap, Map => JMap, UUID } -import java.util.concurrent.atomic.AtomicInteger -import org.scalatest.WordSpec import org.scalacheck._ -import org.scalacheck.Prop._ -import org.scalacheck.Properties +import org.scalatest.WordSpec import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ -import scala.collection.mutable.{ - ArrayBuffer, - HashMap => MutableHashMap, - Map => MutableMap, - SynchronizedBuffer, - SynchronizedMap -} +import scala.collection.mutable.{HashMap => MutableHashMap, Map => MutableMap} + /** * Tests for Summingbird's Storm planner. */ class TopologyTests extends WordSpec { - import MapAlgebra.sparseEquiv // This is dangerous, obviously. The Storm platform graphs tested // here use the UnitBatcher, so the actual time extraction isn't @@ -140,6 +124,23 @@ class TopologyTests extends WordSpec { assert(TDistMap(1).get_common.get_parallelism_hint == 50) } + "With same setting on multiple names we use the one for the node" in { + val fmNodeName = "flatMapper" + val smNodeName = "summer" + val p = Storm.source(TraversableSpout(sample[List[Int]])) + .flatMap(testFn).name(fmNodeName) + .sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName) + + val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)), + smNodeName -> Options().set(SummerParallelism(20))) + val storm = Storm.local(opts) + val stormTopo = storm.plan(p).topology + val bolts = stormTopo.get_bolts + + // Tail should use parallelism specified for the summer node + assert(bolts("Tail").get_common.get_parallelism_hint == 20) + } + "If the closes doesnt contain the option we keep going" in { val nodeName = "super dooper node" val otherNodeName = "super dooper node" From 86c24c38dc85adab2a8786bbf38befce7c6dc212 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Wed, 29 Jun 2016 12:48:22 -0700 Subject: [PATCH 2/7] Add back sparseEquiv import. --- .../scala/com/twitter/summingbird/storm/TopologyTests.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala index ed436709a..92ffcc492 100644 --- a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala @@ -16,9 +16,10 @@ package com.twitter.summingbird.storm -import java.util.{Map => JMap} +import java.util.{ Map => JMap } import backtype.storm.generated.StormTopology +import com.twitter.algebird.MapAlgebra import com.twitter.summingbird._ import com.twitter.summingbird.batch.Batcher import com.twitter.summingbird.online.option._ @@ -27,13 +28,14 @@ import org.scalacheck._ import org.scalatest.WordSpec import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap => MutableHashMap, Map => MutableMap} +import scala.collection.mutable.{ HashMap => MutableHashMap, Map => MutableMap } /** * Tests for Summingbird's Storm planner. */ class TopologyTests extends WordSpec { + import MapAlgebra.sparseEquiv // This is dangerous, obviously. The Storm platform graphs tested // here use the UnitBatcher, so the actual time extraction isn't From 54ef23b87504bcf02b3b91a9cf085cfc3affb9c4 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Thu, 30 Jun 2016 14:14:54 -0700 Subject: [PATCH 3/7] Try to create scalding platform tests for the same issue. --- .../scalding/NamedOptionsSpec.scala | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala diff --git a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala new file mode 100644 index 000000000..6559adaaa --- /dev/null +++ b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala @@ -0,0 +1,77 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.scalding + +import com.twitter.algebird.{ MapAlgebra, Monoid, Group, Interval, Last } +import com.twitter.algebird.monad._ +import com.twitter.summingbird.{ Producer, TimeExtractor, TestGraphs } +import com.twitter.summingbird.batch._ +import com.twitter.summingbird.batch.state.HDFSState +import com.twitter.summingbird.option.JobId +import com.twitter.summingbird.SummingbirdRuntimeStats + +import java.util.TimeZone +import java.io.File + +import com.twitter.scalding.{ Source => ScaldingSource, Test => TestMode, _ } +import com.twitter.scalding.typed.TypedSink + +import org.scalacheck._ +import org.scalacheck.Prop._ +import org.scalacheck.Properties + +import org.apache.hadoop.conf.Configuration + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ ArrayBuffer, Buffer, HashMap => MutableHashMap, Map => MutableMap, SynchronizedBuffer, SynchronizedMap } +import scala.util.{ Try => ScalaTry } + +import cascading.scheme.local.{ TextDelimited => CLTextDelimited } +import cascading.tuple.{ Tuple, Fields, TupleEntry } +import cascading.flow.Flow +import cascading.stats.FlowStats +import cascading.tap.Tap +import cascading.scheme.NullScheme +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.RecordReader +import org.apache.hadoop.mapred.OutputCollector + +import org.scalatest.WordSpec + +/** + * Tests for Summingbird's Scalding planner. + */ + +class NamedOptionsSpec extends WordSpec { + import MapAlgebra.sparseEquiv + + implicit def timeExtractor[T <: (Long, _)] = TestUtil.simpleTimeExtractor[T] + + def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get + + "The ScaldingPlatform" should { + "named option should apply for the correct node even if same option defined for previous node" in { + val batchCoveredInput = TestUtil.pruneToBatchCoveredWithTime(inWithTime1, intr, batcher) + val fnAWithTime = toTime(fnA) + val storeAndService = TestStoreService[Int, Int](storeAndServiceStore) + val summer: Summer[P, K, JoinedU] = batchCoveredInput + .flatMap(fnAWithTime).name("fmNode") + .sumByKey(storeAndService).name("smNode") + } + + } +} From 5ab3053f02ca800d5bb46f4f22458e0089f836ef Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Fri, 1 Jul 2016 21:49:51 -0700 Subject: [PATCH 4/7] Add tests for named options for scalding platform. --- .../scalding/NamedOptionsSpec.scala | 168 +++++++++++++----- 1 file changed, 122 insertions(+), 46 deletions(-) diff --git a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala index 6559adaaa..a00de62b2 100644 --- a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala +++ b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala @@ -16,62 +16,138 @@ package com.twitter.summingbird.scalding -import com.twitter.algebird.{ MapAlgebra, Monoid, Group, Interval, Last } -import com.twitter.algebird.monad._ -import com.twitter.summingbird.{ Producer, TimeExtractor, TestGraphs } -import com.twitter.summingbird.batch._ -import com.twitter.summingbird.batch.state.HDFSState -import com.twitter.summingbird.option.JobId -import com.twitter.summingbird.SummingbirdRuntimeStats - -import java.util.TimeZone -import java.io.File - -import com.twitter.scalding.{ Source => ScaldingSource, Test => TestMode, _ } -import com.twitter.scalding.typed.TypedSink - -import org.scalacheck._ -import org.scalacheck.Prop._ -import org.scalacheck.Properties - -import org.apache.hadoop.conf.Configuration - -import scala.collection.JavaConverters._ -import scala.collection.mutable.{ ArrayBuffer, Buffer, HashMap => MutableHashMap, Map => MutableMap, SynchronizedBuffer, SynchronizedMap } -import scala.util.{ Try => ScalaTry } - -import cascading.scheme.local.{ TextDelimited => CLTextDelimited } -import cascading.tuple.{ Tuple, Fields, TupleEntry } -import cascading.flow.Flow -import cascading.stats.FlowStats -import cascading.tap.Tap -import cascading.scheme.NullScheme -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.RecordReader -import org.apache.hadoop.mapred.OutputCollector - +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.property.ConfigDef +import cascading.property.ConfigDef.Setter +import cascading.tuple.Fields +import com.twitter.scalding.{Test => TestMode, _} +import com.twitter.summingbird._ +import com.twitter.summingbird.batch.option.Reducers import org.scalatest.WordSpec /** - * Tests for Summingbird's Scalding planner. + * Tests for application of named options. */ - class NamedOptionsSpec extends WordSpec { - import MapAlgebra.sparseEquiv - implicit def timeExtractor[T <: (Long, _)] = TestUtil.simpleTimeExtractor[T] + private val ReducerKey = "mapred.reduce.tasks" + private val FlatMapNodeName1 = "FM1" + private val FlatMapNodeName2 = "FM2" + private val SummerNodeName1 = "SM1" + private val SummerNodeName2 = "SM2" + + private val IdentitySink = new Sink[Int] { + override def write(incoming: PipeFactory[Int]): PipeFactory[Int] = incoming + } + + implicit def timeExtractor[T <: (Int, _)] = + new TimeExtractor[T] { + override def apply(t: T) = t._1.toLong + } - def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get + def pipeConfig(pipe: Pipe): Map[String, String] = { + val configCollector = new Setter { + var config = Map.empty[String, String] + override def set(key: String, value: String): String = { config += key -> value; "" } + override def get(key: String): String = ??? + override def update(key: String, value: String): String = ??? + } + + def recurse(p: Pipe): Unit = { + val cfg = p.getStepConfigDef + if (!cfg.isEmpty) { + cfg.apply(ConfigDef.Mode.REPLACE, configCollector) + } + p.getPrevious.foreach(recurse(_)) + } + + recurse(pipe) + configCollector.config + } + + def verify[T]( + options: Map[String, Options], + expectedReducers: Int)( + jobGen: (Producer[Scalding, (Int, Int)], scalding.Store[Int, Int]) => TailProducer[Scalding, Any]) = { + + val src = Scalding.sourceFromMappable { dr => IterableSource(List.empty[(Int, Int)]) } + val store = TestStore[Int, Int]("store", TestUtil.simpleBatcher, Map.empty[Int, Int], Long.MaxValue) + val job = jobGen(src, store) + val interval = TestUtil.toTimeInterval(1L, Long.MaxValue) + + val scaldingPlatform = Scalding("named options test", options) + val mode: Mode = TestMode(t => (store.sourceToBuffer).get(t)) + + val flowToPipe = scaldingPlatform + .plan(job) + .apply((interval, mode)) + .right + .get + ._2 + + val fd = new FlowDef + val typedPipe = flowToPipe.apply((fd, mode)) + def tupleSetter[T] = new TupleSetter[T] { + override def apply(arg: T) = { + val tup = cascading.tuple.Tuple.size(1) + tup.set(0, arg) + tup + } + override def arity = 1 + } + val pipe = typedPipe.toPipe(new Fields("0"))(fd, mode, tupleSetter) + println(pipeConfig(pipe)) + val numReducers = pipeConfig(pipe)(ReducerKey).toInt + assert(numReducers === expectedReducers) + } "The ScaldingPlatform" should { - "named option should apply for the correct node even if same option defined for previous node" in { - val batchCoveredInput = TestUtil.pruneToBatchCoveredWithTime(inWithTime1, intr, batcher) - val fnAWithTime = toTime(fnA) - val storeAndService = TestStoreService[Int, Int](storeAndServiceStore) - val summer: Summer[P, K, JoinedU] = batchCoveredInput - .flatMap(fnAWithTime).name("fmNode") - .sumByKey(storeAndService).name("smNode") + "use named option for the correct node even if same option defined for previous node" in { + val fmReducers = 50 + val smReducers = 100 + + val options = Map( + FlatMapNodeName1 -> Options().set(Reducers(fmReducers)), + SummerNodeName1 -> Options().set(Reducers(smReducers))) + + verify(options, smReducers) { (source, store) => + source + .flatMap(Some(_)).name(FlatMapNodeName1) + .sumByKey(store).name(SummerNodeName1) + } } + "use named option from the closest node when two names defined one after the other" in { + val smReducers1 = 50 + val smReducers2 = 100 + + val options = Map( + SummerNodeName1 -> Options().set(Reducers(smReducers1)), + SummerNodeName2 -> Options().set(Reducers(smReducers2))) + + verify(options, smReducers1) { (source, store) => + source + .flatMap(Some(_)) + .sumByKey(store).name(SummerNodeName1).name(SummerNodeName2) + } + } + + "use named option from the closest upstream node if option not defined on current node" in { + val fmReducers1 = 50 + val fmReducers2 = 100 + + val options = Map( + FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)), + FlatMapNodeName1 -> Options().set(Reducers(fmReducers2))) + + verify(options, fmReducers2) { (source, store) => + source + .flatMap(Some(_)).name(FlatMapNodeName1) + .sumByKey(store).name(SummerNodeName1) + .map { case (k, (optV, v)) => k }.name(FlatMapNodeName2) + .write(IdentitySink) + } + } } } From 2cada35f0f01dd019fc61e7ab4642528e3a80bf4 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Fri, 1 Jul 2016 21:53:35 -0700 Subject: [PATCH 5/7] Fix scalding named option test. --- .../com/twitter/summingbird/scalding/NamedOptionsSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala index a00de62b2..834086d96 100644 --- a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala +++ b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala @@ -21,7 +21,7 @@ import cascading.pipe.Pipe import cascading.property.ConfigDef import cascading.property.ConfigDef.Setter import cascading.tuple.Fields -import com.twitter.scalding.{Test => TestMode, _} +import com.twitter.scalding.{ Test => TestMode, _ } import com.twitter.summingbird._ import com.twitter.summingbird.batch.option.Reducers import org.scalatest.WordSpec @@ -139,7 +139,7 @@ class NamedOptionsSpec extends WordSpec { val options = Map( FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)), - FlatMapNodeName1 -> Options().set(Reducers(fmReducers2))) + FlatMapNodeName2 -> Options().set(Reducers(fmReducers2))) verify(options, fmReducers2) { (source, store) => source From 7422550898e9bdc77c190db4db690ee46c07a28c Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Sat, 2 Jul 2016 13:06:32 -0700 Subject: [PATCH 6/7] More scalding platform named option tests. --- .../scalding/NamedOptionsSpec.scala | 80 +++++++++++++++++-- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala index 834086d96..e8dada573 100644 --- a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala +++ b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala @@ -21,9 +21,10 @@ import cascading.pipe.Pipe import cascading.property.ConfigDef import cascading.property.ConfigDef.Setter import cascading.tuple.Fields -import com.twitter.scalding.{ Test => TestMode, _ } +import com.twitter.scalding.{Test => TestMode, _} import com.twitter.summingbird._ import com.twitter.summingbird.batch.option.Reducers +import com.twitter.summingbird.option.MonoidIsCommutative import org.scalatest.WordSpec /** @@ -46,10 +47,17 @@ class NamedOptionsSpec extends WordSpec { override def apply(t: T) = t._1.toLong } - def pipeConfig(pipe: Pipe): Map[String, String] = { + def pipeConfig(pipe: Pipe): Map[String, List[String]] = { val configCollector = new Setter { - var config = Map.empty[String, String] - override def set(key: String, value: String): String = { config += key -> value; "" } + var config = Map.empty[String, List[String]] + override def set(key: String, value: String): String = { + if (config.contains(key)) { + config = config.updated(key, value :: config(key)) + } else { + config += key -> List(value) + } + "" + } override def get(key: String): String = ??? override def update(key: String, value: String): String = ??? } @@ -97,13 +105,12 @@ class NamedOptionsSpec extends WordSpec { override def arity = 1 } val pipe = typedPipe.toPipe(new Fields("0"))(fd, mode, tupleSetter) - println(pipeConfig(pipe)) - val numReducers = pipeConfig(pipe)(ReducerKey).toInt + val numReducers = pipeConfig(pipe)(ReducerKey).head.toInt assert(numReducers === expectedReducers) } "The ScaldingPlatform" should { - "use named option for the correct node even if same option defined for previous node" in { + "with same setting on multiple names use the one for the node" in { val fmReducers = 50 val smReducers = 100 @@ -133,7 +140,7 @@ class NamedOptionsSpec extends WordSpec { } } - "use named option from the closest upstream node if option not defined on current node" in { + "use named option from the upstream node if option not defined on current node" in { val fmReducers1 = 50 val fmReducers2 = 100 @@ -149,5 +156,62 @@ class NamedOptionsSpec extends WordSpec { .write(IdentitySink) } } + + "use named option from the upstream node if option not defined on current node, even if upstream node is more than a node apart" in { + val fmReducers1 = 50 + val fmReducers2 = 100 + + val options = Map( + FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)), + FlatMapNodeName2 -> Options().set(Reducers(fmReducers2))) + + verify(options, fmReducers2) { (source, store) => + source + .flatMap(Some(_)).name(FlatMapNodeName1) + .sumByKey(store).name(SummerNodeName1) + .flatMap { case (k, (optV, v)) => Some(k) } + .flatMap { k => List(k, k) }.name(FlatMapNodeName2) + .write(IdentitySink) + } + } + + "use named option from the closest upstream node if same option defined on two upstream nodes" in { + val fmReducers1 = 50 + val fmReducers2 = 100 + + val options = Map( + FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)), + FlatMapNodeName2 -> Options().set(Reducers(fmReducers2))) + + verify(options, fmReducers1) { (source, store) => + source + .flatMap(Some(_)) + .sumByKey(store).name(SummerNodeName1) + .flatMap { case (k, (optV, v)) => Some(k) }.name(FlatMapNodeName1) + .flatMap { k => List(k, k) }.name(FlatMapNodeName2) + .write(IdentitySink) + } + } + + "options propagate backwards" in { + val fmReducers2 = 1000 + + /** + * Here FlatMapNodeName1 is closer to the summer node but doesn't have Reducers property + * defined so it is picked from FlatMapNodeName2. + */ + val options = Map( + FlatMapNodeName1 -> Options().set(MonoidIsCommutative), + FlatMapNodeName2 -> Options().set(Reducers(fmReducers2))) + + verify(options, fmReducers2) { (source, store) => + source + .flatMap(Some(_)) + .sumByKey(store).name(SummerNodeName1) + .flatMap { case (k, (optV, v)) => Some(k) }.name(FlatMapNodeName1) + .flatMap { k => List(k, k) }.name(FlatMapNodeName2) + .write(IdentitySink) + } + } } } From b502b7b0377e39f7bf398ae6be91de803f8a94c3 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Sat, 2 Jul 2016 14:25:30 -0700 Subject: [PATCH 7/7] Fix https://github.com/twitter/summingbird/issues/671 --- .../summingbird/storm/TopologyTests.scala | 35 ++++++++++--------- .../summingbird/storm/StormPlatform.scala | 2 +- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala index 92ffcc492..9d3f3ab87 100644 --- a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala @@ -126,23 +126,6 @@ class TopologyTests extends WordSpec { assert(TDistMap(1).get_common.get_parallelism_hint == 50) } - "With same setting on multiple names we use the one for the node" in { - val fmNodeName = "flatMapper" - val smNodeName = "summer" - val p = Storm.source(TraversableSpout(sample[List[Int]])) - .flatMap(testFn).name(fmNodeName) - .sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName) - - val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)), - smNodeName -> Options().set(SummerParallelism(20))) - val storm = Storm.local(opts) - val stormTopo = storm.plan(p).topology - val bolts = stormTopo.get_bolts - - // Tail should use parallelism specified for the summer node - assert(bolts("Tail").get_common.get_parallelism_hint == 20) - } - "If the closes doesnt contain the option we keep going" in { val nodeName = "super dooper node" val otherNodeName = "super dooper node" @@ -199,4 +182,22 @@ class TopologyTests extends WordSpec { assert(TDistMap(0).get_common.get_parallelism_hint == 5) } + + "With same setting on multiple names we use the one for the node" in { + val fmNodeName = "flatMapper" + val smNodeName = "summer" + val p = Storm.source(TraversableSpout(sample[List[Int]])) + .flatMap(testFn).name(fmNodeName) + .sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName) + + val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)), + smNodeName -> Options().set(SummerParallelism(20))) + val storm = Storm.local(opts) + val stormTopo = storm.plan(p).topology + val bolts = stormTopo.get_bolts + + // Tail should use parallelism specified for the summer node + assert(bolts("Tail").get_common.get_parallelism_hint == 20) + } + } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index dedf2ef06..637dd59d9 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -124,7 +124,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird private type Prod[T] = Producer[Storm, T] private[storm] def get[T <: AnyRef: ClassTag](dag: Dag[Storm], node: StormNode): Option[(String, T)] = { - val producer = node.members.last + val producer = node.members.head Options.getFirst[T](options, dag.producerToPriorityNames(producer)) }