diff --git a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala index 91e6f2f2b..68ba1f010 100644 --- a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala @@ -26,12 +26,12 @@ object InjectionLaws extends Properties("InjectionTests") { implicit def ts: Arbitrary[Timestamp] = Arbitrary(Arbitrary.arbitrary[Long].map(Timestamp(_))) - property("Single injection works") = forAll { in: String => - val inj = new SingleItemInjection[String] + property("Item injection works") = forAll { in: String => + val inj = EdgeTypeInjections.forItem[String] inj.invert(inj(in)).get == in } property("KV injection works") = forAll { in: (String, List[Int]) => - val inj = new KeyValueInjection[String, List[Int]] + val inj = EdgeTypeInjections.forKeyValue[String, List[Int]] inj.invert(inj(in)).get == in } } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala index 38b6d04f4..ad7fc3f09 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala @@ -17,7 +17,6 @@ limitations under the License. package com.twitter.summingbird.storm import scala.util.{ Failure, Success } -import com.twitter.bijection.Injection import java.util.{ Map => JMap } import com.twitter.summingbird.storm.option.{ AckOnEntry, AnchorTuples, MaxExecutePerSecond } import com.twitter.summingbird.online.executor.OperationContainer @@ -27,28 +26,39 @@ import com.twitter.summingbird.{ Group, JobCounters, Name, SummingbirdRuntimeSta import com.twitter.summingbird.online.Externalizer import chain.Chain import scala.collection.JavaConverters._ -import java.util.{ List => JList } import org.apache.storm.task.{ OutputCollector, TopologyContext } -import org.apache.storm.topology.{ IRichBolt, OutputFieldsDeclarer } -import org.apache.storm.tuple.{ Fields, Tuple, TupleImpl } +import org.apache.storm.topology.{ BoltDeclarer, IRichBolt, OutputFieldsDeclarer } +import org.apache.storm.tuple.{ Tuple, TupleImpl } import org.slf4j.{ Logger, LoggerFactory } /** - * - * @author Oscar Boykin - * @author Ian O Connell - * @author Sam Ritchie - * @author Ashu Singhal - */ + * This class is used as an implementation for Storm's `Bolt`s. + * + * @param jobID is an id for current topology, used for metrics. + * @param metrics represents metrics we want to collect on this node. + * @param anchorTuples should be equal to true if you want to utilize Storm's anchoring of tuples, + * false otherwise. + * @param hasDependants does this node have any downstream nodes? + * @param ackOnEntry ack tuples in the beginning of processing. + * @param maxExecutePerSec limits number of executes per second, will block processing thread after. + * Used for rate limiting. + * @param inputEdgeTypes is a map from name of downstream node to `Edge` from it. + * @param outputEdgeType is an edge from this node. To be precise there are number of output edges, + * but we expect them to have same format and we don't use their grouping + * information here, therefore it's ok to have only one instance of `Edge` here. + * @param executor is `OperationContainer` which represents operation for this `Bolt`, + * for example it can be summing or flat mapping. + * @tparam I is a type of input tuples for this `Bolt`s executor. + * @tparam O is a type of output tuples for this `Bolt`s executor. + */ case class BaseBolt[I, O](jobID: JobId, metrics: () => TraversableOnce[StormMetric[_]], anchorTuples: AnchorTuples, hasDependants: Boolean, - outputFields: Fields, ackOnEntry: AckOnEntry, maxExecutePerSec: MaxExecutePerSecond, - decoder: Injection[I, JList[AnyRef]], - encoder: Injection[O, JList[AnyRef]], + inputEdgeTypes: Map[String, EdgeType[I]], + outputEdgeType: EdgeType[O], executor: OperationContainer[I, O, InputState[Tuple]]) extends IRichBolt { @transient protected lazy val logger: Logger = LoggerFactory.getLogger(getClass) @@ -130,7 +140,10 @@ case class BaseBolt[I, O](jobID: JobId, * System ticks come with a fixed stream id */ val curResults = if (!tuple.getSourceStreamId.equals("__tick")) { - val tsIn = decoder.invert(tuple.getValues).get // Failing to decode here is an ERROR + val tsIn = inputEdgeTypes.get(tuple.getSourceComponent) match { + case Some(inputEdgeType) => inputEdgeType.injection.invert(tuple.getValues).get + case None => throw new Exception("Unrecognized source component: " + tuple.getSourceComponent) + } // Don't hold on to the input values clearValues(tuple) if (earlyAck) { collector.ack(tuple) } @@ -155,12 +168,12 @@ case class BaseBolt[I, O](jobID: JobId, if (anchorTuples.anchor) { val states = inputs.iterator.map(_.state).toList.asJava results.foreach { result => - collector.emit(states, encoder(result)) + collector.emit(states, outputEdgeType.injection(result)) emitCount += 1 } } else { // don't anchor results.foreach { result => - collector.emit(encoder(result)) + collector.emit(outputEdgeType.injection(result)) emitCount += 1 } } @@ -183,7 +196,7 @@ case class BaseBolt[I, O](jobID: JobId, } override def declareOutputFields(declarer: OutputFieldsDeclarer) { - if (hasDependants) { declarer.declare(outputFields) } + if (hasDependants) { declarer.declare(outputEdgeType.fields) } } override val getComponentConfiguration = null @@ -192,6 +205,15 @@ case class BaseBolt[I, O](jobID: JobId, executor.cleanup } + /** + * Apply groupings defined by input edges of this `Bolt` to Storm's topology. + */ + def applyGroupings(declarer: BoltDeclarer): Unit = { + inputEdgeTypes.foreach { case (parentName, inputEdgeType) => + inputEdgeType.grouping.apply(declarer, parentName) + } + } + /** * This is clearly not safe, but done to deal with GC issues since * storm keeps references to values diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala index abffc9b73..f0d17d94c 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala @@ -24,14 +24,6 @@ import com.twitter.summingbird.online.OnlineDefaultConstants * Here we can override ones from online, or add more that are more Storm specific */ object Constants extends OnlineDefaultConstants { - val AGG_KEY = "aggKey" - val AGG_VALUE = "aggValue" - val AGG_BATCH = "aggBatchID" - val RETURN_INFO = "return-info" - - val VALUE_FIELD = "value" - val GROUP_BY_SUM = "groupBySum" - val DEFAULT_SPOUT_STORM_METRICS = SpoutStormMetrics(None) val DEFAULT_FM_STORM_METRICS = FlatMapStormMetrics(None) val DEFAULT_SUMMER_STORM_METRICS = SummerStormMetrics(None) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/EdgeGrouping.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/EdgeGrouping.scala new file mode 100644 index 000000000..a7127db39 --- /dev/null +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/EdgeGrouping.scala @@ -0,0 +1,29 @@ +package com.twitter.summingbird.storm + +import org.apache.storm.topology.BoltDeclarer +import org.apache.storm.tuple.{ Fields => StormFields } + +/** + * This trait is used to represent different grouping strategies in `Storm`. + */ +sealed trait EdgeGrouping { + /** + * How to apply this `EdgeGrouping` to edge between `parentName` node and bolt declared by `declarer`. + */ + def apply(declarer: BoltDeclarer, parentName: String): Unit +} + +object EdgeGrouping { + case object Shuffle extends EdgeGrouping { + override def apply(declarer: BoltDeclarer, parentName: String): Unit = + declarer.shuffleGrouping(parentName) + } + case object LocalOrShuffle extends EdgeGrouping { + override def apply(declarer: BoltDeclarer, parentName: String): Unit = + declarer.localOrShuffleGrouping(parentName) + } + case class Fields(fields: StormFields) extends EdgeGrouping { + override def apply(declarer: BoltDeclarer, parentName: String): Unit = + declarer.fieldsGrouping(parentName, fields) + } +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/EdgeType.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/EdgeType.scala new file mode 100644 index 000000000..ff577b476 --- /dev/null +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/EdgeType.scala @@ -0,0 +1,86 @@ +package com.twitter.summingbird.storm + +import com.twitter.bijection.{ Injection, Inversion } +import org.apache.storm.tuple.Fields +import java.util.{ ArrayList => JAList, List => JList } + +import com.twitter.summingbird.online.executor.KeyValueShards + +import scala.collection.{ Map => CMap } +import scala.util.Try + +/** + * This trait represents an edge in Storm topology DAG between two nodes. + * @tparam T represents type of tuples sent over this `Edge`. + */ +sealed trait EdgeType[T] { + /** + * Storm's `Fields` are going to be sent over this `Edge`. + */ + val fields: Fields + + /** + * Injection from values (which are going to be sent) to Storm's values. + * Each element in returned array corresponds to element in `Fields` at the same index. + */ + val injection: Injection[T, JList[AnyRef]] + + /** + * Grouping for this `Edge`. + */ + val grouping: EdgeGrouping +} + +object EdgeType { + /** + * Simplest possible type of `Edge`, without any assumptions about content inside. + */ + case class Item[T] private[storm] (edgeGrouping: EdgeGrouping) extends EdgeType[T] { + override val fields: Fields = new Fields("value") + override val injection: Injection[T, JList[AnyRef]] = EdgeTypeInjections.forItem + override val grouping: EdgeGrouping = edgeGrouping + } + + /** + * This `Edge` type used for aggregated key value pairs emitted by partial aggregation. + * @param shards is a number which was used for partial aggregation. + */ + case class AggregatedKeyValues[K, V](shards: KeyValueShards) extends EdgeType[(Int, CMap[K, V])] { + override val fields: Fields = new Fields("aggKey", "aggValue") + override val injection: Injection[(Int, CMap[K, V]), JList[AnyRef]] = EdgeTypeInjections.forKeyValue + override val grouping: EdgeGrouping = EdgeGrouping.Fields(new Fields("aggKey")) + } + + def itemWithShuffleGrouping[T]: Item[T] = Item[T](EdgeGrouping.Shuffle) + def itemWithLocalOrShuffleGrouping[T]: Item[T] = Item[T](EdgeGrouping.LocalOrShuffle) +} + +private object EdgeTypeInjections { + def forItem[T]: Injection[T, JList[AnyRef]] = new Injection[T, JList[AnyRef]] { + override def apply(tuple: T): JAList[AnyRef] = { + val list = new JAList[AnyRef](1) + list.add(tuple.asInstanceOf[AnyRef]) + list + } + + override def invert(valueIn: JList[AnyRef]): Try[T] = Inversion.attempt(valueIn) { input => + input.get(0).asInstanceOf[T] + } + } + + def forKeyValue[K, V]: Injection[(K, V), JList[AnyRef]] = new Injection[(K, V), JList[AnyRef]] { + override def apply(tuple: (K, V)): JAList[AnyRef] = { + val (key, value) = tuple + val list = new JAList[AnyRef](2) + list.add(key.asInstanceOf[AnyRef]) + list.add(value.asInstanceOf[AnyRef]) + list + } + + override def invert(valueIn: JList[AnyRef]): Try[(K, V)] = Inversion.attempt(valueIn) { input => + val key = input.get(0).asInstanceOf[K] + val value = input.get(1).asInstanceOf[V] + (key, value) + } + } +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala index fb7991430..f27bfcccb 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala @@ -27,9 +27,7 @@ import com.twitter.summingbird.online.executor import com.twitter.summingbird.online.FlatMapOperation import com.twitter.summingbird.storm.planner._ import org.apache.storm.topology.TopologyBuilder -import org.apache.storm.tuple.Fields import org.slf4j.LoggerFactory -import scala.collection.{ Map => CMap } /** * These are helper functions for building a bolt from a Node[Storm] element. @@ -83,24 +81,21 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm], private val maxEmitPerExecute = getOrElse(DEFAULT_MAX_EMIT_PER_EXECUTE) logger.info(s"[$nodeName] maxEmitPerExecute : ${maxEmitPerExecute.get}") + private val usePreferLocalDependency = getOrElse(DEFAULT_FM_PREFER_LOCAL_DEPENDENCY) + logger.info(s"[$nodeName] usePreferLocalDependency: ${usePreferLocalDependency.get}") + private def getFFMBolt[T, K, V](summer: SummerNode[Storm]) = { - type ExecutorInput = (Timestamp, T) - type ExecutorKey = Int - type InnerValue = (Timestamp, V) - type ExecutorValue = CMap[(K, BatchID), InnerValue] + type Input = (Timestamp, T) + type OutputKey = (K, BatchID) + type OutputValue = (Timestamp, V) val summerProducer = summer.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, K, V]] // When emitting tuples between the Final Flat Map and the summer we encode the timestamp in the value // The monoid we use in aggregation is timestamp max. val batcher = summerProducer.store.mergeableBatcher implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup - // Query to get the summer paralellism of the summer down stream of us we are emitting to - // to ensure no edge case between what we might see for its parallelism and what it would see/pass to storm. - val summerParalellism = getOrElse(DEFAULT_SUMMER_PARALLELISM, summer) - val summerBatchMultiplier = getOrElse(DEFAULT_SUMMER_BATCH_MULTIPLIER, summer) - // This option we report its value here, but its not user settable. - val keyValueShards = executor.KeyValueShards(summerParalellism.parHint * summerBatchMultiplier.get) + val keyValueShards = storm.getSummerKeyValueShards(stormDag, summer) logger.info(s"[$nodeName] keyValueShards : ${keyValueShards.get}") val operation = foldOperations[T, (K, V)](node.members.reverse) @@ -113,11 +108,10 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm], metrics.metrics, anchorTuples, true, - new Fields(AGG_KEY, AGG_VALUE), ackOnEntry, maxExecutePerSec, - new SingleItemInjection[ExecutorInput], - new KeyValueInjection[ExecutorKey, ExecutorValue], + inputEdgeTypes[Input], + EdgeType.AggregatedKeyValues[OutputKey, OutputValue](keyValueShards), new executor.FinalFlatMap( wrappedOperation, builder, @@ -125,7 +119,7 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm], maxWaitTime, maxEmitPerExecute, keyValueShards - )(implicitly[Semigroup[InnerValue]]) + )(implicitly[Semigroup[OutputValue]]) ) } @@ -141,11 +135,11 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm], metrics.metrics, anchorTuples, stormDag.dependantsOf(node).size > 0, - new Fields(VALUE_FIELD), ackOnEntry, maxExecutePerSec, - new SingleItemInjection[ExecutorInput], - new SingleItemInjection[ExecutorOutput], + inputEdgeTypes[ExecutorInput], + // Output edge's grouping isn't important for now. + EdgeType.itemWithLocalOrShuffleGrouping[ExecutorOutput], new executor.IntermediateFlatMap( wrappedOperation, maxWaiting, @@ -162,4 +156,15 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm], case None => getIntermediateFMBolt[Any, Any].asInstanceOf[BaseBolt[Any, Any]] } } + + private def inputEdgeTypes[Input]: Map[String, EdgeType[Input]] = { + val edge = if (usePreferLocalDependency.get) { + EdgeType.itemWithLocalOrShuffleGrouping[Input] + } else { + EdgeType.itemWithShuffleGrouping[Input] + } + + val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) } + dependenciesNames.map((_, edge)).toMap + } } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 692c66d49..2b0c57f6f 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -36,9 +36,7 @@ import com.twitter.tormenta.spout.Spout import com.twitter.util.Future import org.apache.storm.generated.StormTopology import org.apache.storm.topology.TopologyBuilder -import org.apache.storm.tuple.Fields import org.slf4j.LoggerFactory -import scala.collection.{ Map => CMap } import scala.reflect.ClassTag /* @@ -134,6 +132,15 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird option } + private[storm] def getSummerKeyValueShards(dag: Dag[Storm], summer: SummerNode[Storm]): executor.KeyValueShards = { + // Query to get the summer paralellism of the summer down stream of us we are emitting to + // to ensure no edge case between what we might see for its parallelism and what it would see/pass to storm. + val summerParalellism = getOrElse(dag, summer, DEFAULT_SUMMER_PARALLELISM) + val summerBatchMultiplier = getOrElse(dag, summer, DEFAULT_SUMMER_BATCH_MULTIPLIER) + + executor.KeyValueShards(summerParalellism.parHint * summerBatchMultiplier.get) + } + /** * Set storm to tick our nodes every second to clean up finished futures */ @@ -143,31 +150,23 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird boltConfig } - private def scheduleFlatMapper(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleFlatMapper(jobID: JobId, stormDag: Dag[Storm], node: FlatMapNode[Storm])(implicit topologyBuilder: TopologyBuilder) = { val nodeName = stormDag.getNodeName(node) - val usePreferLocalDependency = getOrElse(stormDag, node, DEFAULT_FM_PREFER_LOCAL_DEPENDENCY) - logger.info(s"[$nodeName] usePreferLocalDependency: ${usePreferLocalDependency.get}") val bolt: BaseBolt[Any, Any] = FlatMapBoltProvider(this, jobID, stormDag, node).apply val parallelism = getOrElse(stormDag, node, DEFAULT_FM_PARALLELISM).parHint val declarer = topologyBuilder.setBolt(nodeName, bolt, parallelism).addConfigurations(tickConfig) - - val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) } - if (usePreferLocalDependency.get) { - dependenciesNames.foreach { declarer.localOrShuffleGrouping(_) } - } else { - dependenciesNames.foreach { declarer.shuffleGrouping(_) } - } + bolt.applyGroupings(declarer) } - private def scheduleSpout(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleSpout(jobID: JobId, stormDag: Dag[Storm], node: SourceNode[Storm])(implicit topologyBuilder: TopologyBuilder) = { val nodeName = stormDag.getNodeName(node) val (sourceParalleism, stormSpout) = SpoutProvider(this, stormDag, node, jobID).apply topologyBuilder.setSpout(nodeName, stormSpout, sourceParalleism) } - private def scheduleSummerBolt(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder): Unit = { + private def scheduleSummerBolt(jobID: JobId, stormDag: Dag[Storm], node: SummerNode[Storm])(implicit topologyBuilder: TopologyBuilder): Unit = { val nodeName = stormDag.getNodeName(node) @@ -205,16 +204,21 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val supplier: MergeableStoreFactory[ExecutorKeyType, V] = summer.store + val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) } + val inputEdges = dependenciesNames.map(parent => + (parent, EdgeType.AggregatedKeyValues[ExecutorKeyType, ExecutorValueType](getSummerKeyValueShards(stormDag, node))) + ).toMap + BaseBolt( jobID, metrics.metrics, anchorTuples, shouldEmit, - new Fields(VALUE_FIELD), ackOnEntry, maxExecutePerSec, - new KeyValueInjection[Int, CMap[ExecutorKeyType, ExecutorValueType]], - new SingleItemInjection[ExecutorOutputType], + inputEdges, + // Output edge's grouping isn't important for now. + EdgeType.itemWithLocalOrShuffleGrouping[ExecutorOutputType], new executor.Summer( () => new WrappedTSInMergeable(supplier.mergeableStore(semigroup)), flatmapOp, @@ -230,16 +234,14 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val summerUntyped = node.members.collect { case c@Summer(_, _, _) => c }.head val parallelism = getOrElse(stormDag, node, DEFAULT_SUMMER_PARALLELISM).parHint + val bolt = sinkBolt(summerUntyped) val declarer = topologyBuilder.setBolt( nodeName, - sinkBolt(summerUntyped), + bolt, parallelism ).addConfigurations(tickConfig) - val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) } - dependenciesNames.foreach { parentName => - declarer.fieldsGrouping(parentName, new Fields(AGG_KEY)) - } + bolt.applyGroupings(declarer) } private def dumpOptions: String = { @@ -310,12 +312,10 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird } } - stormDag.nodes.foreach { node => - node match { - case _: SummerNode[_] => scheduleSummerBolt(jobID, stormDag, node) - case _: FlatMapNode[_] => scheduleFlatMapper(jobID, stormDag, node) - case _: SourceNode[_] => scheduleSpout(jobID, stormDag, node) - } + stormDag.nodes.foreach { + case summerNode: SummerNode[Storm] => scheduleSummerBolt(jobID, stormDag, summerNode) + case flatMapNode: FlatMapNode[Storm] => scheduleFlatMapper(jobID, stormDag, flatMapNode) + case sourceNode: SourceNode[Storm] => scheduleSpout(jobID, stormDag, sourceNode) } PlannedTopology(config, topologyBuilder.createTopology) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/TupleInjections.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/TupleInjections.scala deleted file mode 100644 index 5041228a7..000000000 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/TupleInjections.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* -Copyright 2013 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package com.twitter.summingbird.storm - -import com.twitter.bijection.{ Injection, Inversion } -import java.util.{ List => JList, ArrayList => JAList } - -class SingleItemInjection[T] extends Injection[T, JList[AnyRef]] { - - override def apply(t: T) = { - val list = new JAList[AnyRef](1) - list.add(t.asInstanceOf[AnyRef]) - list - } - - override def invert(vin: JList[AnyRef]) = Inversion.attempt(vin) { v => - v.get(0).asInstanceOf[T] - } -} - -class KeyValueInjection[K, V] - extends Injection[(K, V), JList[AnyRef]] { - - override def apply(item: (K, V)) = { - val (key, v) = item - val list = new JAList[AnyRef](2) - list.add(key.asInstanceOf[AnyRef]) - list.add(v.asInstanceOf[AnyRef]) - list - } - - override def invert(vin: JList[AnyRef]) = Inversion.attempt(vin) { v => - val key = v.get(0).asInstanceOf[K] - val value = v.get(1).asInstanceOf[V] - (key, value) - } -} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala index 28a7633fd..79aa665aa 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala @@ -4,16 +4,16 @@ import com.twitter.algebird.Semigroup import com.twitter.algebird.util.summer.Incrementor import com.twitter.summingbird.online.executor.KeyValueShards import com.twitter.summingbird.online.option.{ SummerBuilder, MaxEmitPerExecute } -import com.twitter.summingbird.storm.Constants._ import com.twitter.tormenta.spout.SpoutProxy import com.twitter.summingbird.storm.collector.AggregatorOutputCollector import com.twitter.util.{ Duration, Time } import org.apache.storm.spout.SpoutOutputCollector import org.apache.storm.task.TopologyContext import org.apache.storm.topology.{ IRichSpout, OutputFieldsDeclarer } -import org.apache.storm.tuple.Fields import java.util.{ Map => JMap } +import com.twitter.summingbird.storm.EdgeType + /** * This is a spout used when the spout is being followed by summer. * It uses a AggregatorOutputCollector on open. @@ -26,12 +26,14 @@ class KeyValueSpout[K, V: Semigroup]( flushExecTimeCounter: Incrementor, executeTimeCounter: Incrementor) extends SpoutProxy { - private final val tickFrequency = Duration.fromMilliseconds(1000) + private val tickFrequency = Duration.fromMilliseconds(1000) + private val outputEdgeType = EdgeType.AggregatedKeyValues(summerShards) + private var adapterCollector: AggregatorOutputCollector[K, V] = _ var lastDump = Time.now override def declareOutputFields(declarer: OutputFieldsDeclarer) = { - declarer.declare(new Fields(AGG_KEY, AGG_VALUE)) + declarer.declare(outputEdgeType.fields) } /**