This repository has been archived by the owner on Jan 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 267
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
11adbcb
commit 4a602fd
Showing
1 changed file
with
23 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,40 +16,24 @@ | |
|
||
package com.twitter.summingbird.storm | ||
|
||
import java.util.{Map => JMap} | ||
|
||
import backtype.storm.generated.StormTopology | ||
import com.twitter.algebird.{ MapAlgebra, Semigroup } | ||
import com.twitter.storehaus.{ ReadableStore, JMapStore } | ||
import com.twitter.storehaus.algebra.MergeableStore | ||
import com.twitter.summingbird._ | ||
import com.twitter.summingbird.online._ | ||
import com.twitter.summingbird.batch.Batcher | ||
import com.twitter.summingbird.online.option._ | ||
import com.twitter.summingbird.storm.option._ | ||
import com.twitter.summingbird.batch.{ BatchID, Batcher } | ||
import com.twitter.summingbird.storm.spout.TraversableSpout | ||
import com.twitter.tormenta.spout.Spout | ||
import com.twitter.util.Future | ||
import java.util.{ Collections, HashMap, Map => JMap, UUID } | ||
import java.util.concurrent.atomic.AtomicInteger | ||
import org.scalatest.WordSpec | ||
import org.scalacheck._ | ||
import org.scalacheck.Prop._ | ||
import org.scalacheck.Properties | ||
import org.scalatest.WordSpec | ||
|
||
import scala.collection.JavaConversions._ | ||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.{ | ||
ArrayBuffer, | ||
HashMap => MutableHashMap, | ||
Map => MutableMap, | ||
SynchronizedBuffer, | ||
SynchronizedMap | ||
} | ||
import scala.collection.mutable.{HashMap => MutableHashMap, Map => MutableMap} | ||
|
||
/** | ||
* Tests for Summingbird's Storm planner. | ||
*/ | ||
|
||
class TopologyTests extends WordSpec { | ||
import MapAlgebra.sparseEquiv | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
pankajroark
Author
Contributor
|
||
|
||
// This is dangerous, obviously. The Storm platform graphs tested | ||
// here use the UnitBatcher, so the actual time extraction isn't | ||
|
@@ -140,6 +124,23 @@ class TopologyTests extends WordSpec { | |
assert(TDistMap(1).get_common.get_parallelism_hint == 50) | ||
} | ||
|
||
"With same setting on multiple names we use the one for the node" in { | ||
val fmNodeName = "flatMapper" | ||
val smNodeName = "summer" | ||
val p = Storm.source(TraversableSpout(sample[List[Int]])) | ||
.flatMap(testFn).name(fmNodeName) | ||
.sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName) | ||
|
||
val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)), | ||
smNodeName -> Options().set(SummerParallelism(20))) | ||
val storm = Storm.local(opts) | ||
val stormTopo = storm.plan(p).topology | ||
val bolts = stormTopo.get_bolts | ||
|
||
// Tail should use parallelism specified for the summer node | ||
assert(bolts("Tail").get_common.get_parallelism_hint == 20) | ||
} | ||
|
||
"If the closes doesnt contain the option we keep going" in { | ||
val nodeName = "super dooper node" | ||
val otherNodeName = "super dooper node" | ||
|
why is this being removed? i believe it changes how map comparisons occur?