Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Test to illustrate https://github.com/twitter/summingbird/issues/671 #672

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,24 @@

package com.twitter.summingbird.storm

import java.util.{Map => JMap}

import backtype.storm.generated.StormTopology
import com.twitter.algebird.{ MapAlgebra, Semigroup }
import com.twitter.storehaus.{ ReadableStore, JMapStore }
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.summingbird._
import com.twitter.summingbird.online._
import com.twitter.summingbird.batch.Batcher
import com.twitter.summingbird.online.option._
import com.twitter.summingbird.storm.option._
import com.twitter.summingbird.batch.{ BatchID, Batcher }
import com.twitter.summingbird.storm.spout.TraversableSpout
import com.twitter.tormenta.spout.Spout
import com.twitter.util.Future
import java.util.{ Collections, HashMap, Map => JMap, UUID }
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.WordSpec
import org.scalacheck._
import org.scalacheck.Prop._
import org.scalacheck.Properties
import org.scalatest.WordSpec

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.{
ArrayBuffer,
HashMap => MutableHashMap,
Map => MutableMap,
SynchronizedBuffer,
SynchronizedMap
}
import scala.collection.mutable.{HashMap => MutableHashMap, Map => MutableMap}

/**
* Tests for Summingbird's Storm planner.
*/

class TopologyTests extends WordSpec {
import MapAlgebra.sparseEquiv

// This is dangerous, obviously. The Storm platform graphs tested
// here use the UnitBatcher, so the actual time extraction isn't
Expand Down Expand Up @@ -140,6 +124,23 @@ class TopologyTests extends WordSpec {
assert(TDistMap(1).get_common.get_parallelism_hint == 50)
}

"With same setting on multiple names we use the one for the node" in {
val fmNodeName = "flatMapper"
val smNodeName = "summer"
val p = Storm.source(TraversableSpout(sample[List[Int]]))
.flatMap(testFn).name(fmNodeName)
.sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName)

val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)),
smNodeName -> Options().set(SummerParallelism(20)))
val storm = Storm.local(opts)
val stormTopo = storm.plan(p).topology
val bolts = stormTopo.get_bolts

// Tail should use parallelism specified for the summer node
assert(bolts("Tail").get_common.get_parallelism_hint == 20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assert both settings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean:
assert(bolts("Tail").get_common.get_parallelism_hint != 10)
assert(bolts("Tail").get_common.get_parallelism_hint == 20)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to write the unit test for scalding platform. I'm trying to use the Reducers option for this. For the online platform tests we could inspect the created topology to check the number of summer nodes created, how do I do the corresponding thing for the scalding platform. i.e. how do I inspect the PipeFactory created after planning to check how many reducers it's going to use for a node? I'm new to the scalding platform of summingbird, so may be missing obvious things, will appreciate any help. Thanks

}

"If the closes doesnt contain the option we keep going" in {
val nodeName = "super dooper node"
val otherNodeName = "super dooper node"
Expand Down