Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Refactor Storm platform to introduce Edges #728

Merged
merged 7 commits into from
Jun 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ object InjectionLaws extends Properties("InjectionTests") {
implicit def ts: Arbitrary[Timestamp] =
Arbitrary(Arbitrary.arbitrary[Long].map(Timestamp(_)))

property("Single injection works") = forAll { in: String =>
val inj = new SingleItemInjection[String]
property("Item injection works") = forAll { in: String =>
val inj = EdgeTypeInjections.forItem[String]
inj.invert(inj(in)).get == in
}
property("KV injection works") = forAll { in: (String, List[Int]) =>
val inj = new KeyValueInjection[String, List[Int]]
val inj = EdgeTypeInjections.forKeyValue[String, List[Int]]
inj.invert(inj(in)).get == in
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package com.twitter.summingbird.storm

import scala.util.{ Failure, Success }
import com.twitter.bijection.Injection
import java.util.{ Map => JMap }
import com.twitter.summingbird.storm.option.{ AckOnEntry, AnchorTuples, MaxExecutePerSecond }
import com.twitter.summingbird.online.executor.OperationContainer
Expand All @@ -27,28 +26,39 @@ import com.twitter.summingbird.{ Group, JobCounters, Name, SummingbirdRuntimeSta
import com.twitter.summingbird.online.Externalizer
import chain.Chain
import scala.collection.JavaConverters._
import java.util.{ List => JList }
import org.apache.storm.task.{ OutputCollector, TopologyContext }
import org.apache.storm.topology.{ IRichBolt, OutputFieldsDeclarer }
import org.apache.storm.tuple.{ Fields, Tuple, TupleImpl }
import org.apache.storm.topology.{ BoltDeclarer, IRichBolt, OutputFieldsDeclarer }
import org.apache.storm.tuple.{ Tuple, TupleImpl }
import org.slf4j.{ Logger, LoggerFactory }

/**
*
* @author Oscar Boykin
* @author Ian O Connell
* @author Sam Ritchie
* @author Ashu Singhal
*/
* This class is used as an implementation for Storm's `Bolt`s.
*
* @param jobID is an id for current topology, used for metrics.
* @param metrics represents metrics we want to collect on this node.
* @param anchorTuples should be equal to true if you want to utilize Storm's anchoring of tuples,
* false otherwise.
* @param hasDependants does this node have any downstream nodes?
* @param ackOnEntry ack tuples in the beginning of processing.
* @param maxExecutePerSec limits number of executes per second, will block processing thread after.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add: "Used for rate limiting."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* Used for rate limiting.
* @param inputEdgeTypes is a map from name of downstream node to `Edge` from it.
* @param outputEdgeType is an edge from this node. To be precise there are number of output edges,
* but we expect them to have same format and we don't use their grouping
* information here, therefore it's ok to have only one instance of `Edge` here.
* @param executor is `OperationContainer` which represents operation for this `Bolt`,
* for example it can be summing or flat mapping.
* @tparam I is a type of input tuples for this `Bolt`s executor.
* @tparam O is a type of output tuples for this `Bolt`s executor.
*/
case class BaseBolt[I, O](jobID: JobId,
metrics: () => TraversableOnce[StormMetric[_]],
anchorTuples: AnchorTuples,
hasDependants: Boolean,
outputFields: Fields,
ackOnEntry: AckOnEntry,
maxExecutePerSec: MaxExecutePerSecond,
decoder: Injection[I, JList[AnyRef]],
encoder: Injection[O, JList[AnyRef]],
inputEdgeTypes: Map[String, EdgeType[I]],
outputEdgeType: EdgeType[O],
executor: OperationContainer[I, O, InputState[Tuple]]) extends IRichBolt {

@transient protected lazy val logger: Logger = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -130,7 +140,10 @@ case class BaseBolt[I, O](jobID: JobId,
* System ticks come with a fixed stream id
*/
val curResults = if (!tuple.getSourceStreamId.equals("__tick")) {
val tsIn = decoder.invert(tuple.getValues).get // Failing to decode here is an ERROR
val tsIn = inputEdgeTypes.get(tuple.getSourceComponent) match {
case Some(inputEdgeType) => inputEdgeType.injection.invert(tuple.getValues).get
case None => throw new Exception("Unrecognized source component: " + tuple.getSourceComponent)
}
// Don't hold on to the input values
clearValues(tuple)
if (earlyAck) { collector.ack(tuple) }
Expand All @@ -155,12 +168,12 @@ case class BaseBolt[I, O](jobID: JobId,
if (anchorTuples.anchor) {
val states = inputs.iterator.map(_.state).toList.asJava
results.foreach { result =>
collector.emit(states, encoder(result))
collector.emit(states, outputEdgeType.injection(result))
emitCount += 1
}
} else { // don't anchor
results.foreach { result =>
collector.emit(encoder(result))
collector.emit(outputEdgeType.injection(result))
emitCount += 1
}
}
Expand All @@ -183,7 +196,7 @@ case class BaseBolt[I, O](jobID: JobId,
}

override def declareOutputFields(declarer: OutputFieldsDeclarer) {
if (hasDependants) { declarer.declare(outputFields) }
if (hasDependants) { declarer.declare(outputEdgeType.fields) }
}

override val getComponentConfiguration = null
Expand All @@ -192,6 +205,15 @@ case class BaseBolt[I, O](jobID: JobId,
executor.cleanup
}

/**
* Apply groupings defined by input edges of this `Bolt` to Storm's topology.
*/
def applyGroupings(declarer: BoltDeclarer): Unit = {
inputEdgeTypes.foreach { case (parentName, inputEdgeType) =>
inputEdgeType.grouping.apply(declarer, parentName)
}
}

/**
* This is clearly not safe, but done to deal with GC issues since
* storm keeps references to values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ import com.twitter.summingbird.online.OnlineDefaultConstants
* Here we can override ones from online, or add more that are more Storm specific
*/
object Constants extends OnlineDefaultConstants {
val AGG_KEY = "aggKey"
val AGG_VALUE = "aggValue"
val AGG_BATCH = "aggBatchID"
val RETURN_INFO = "return-info"

val VALUE_FIELD = "value"
val GROUP_BY_SUM = "groupBySum"

val DEFAULT_SPOUT_STORM_METRICS = SpoutStormMetrics(None)
val DEFAULT_FM_STORM_METRICS = FlatMapStormMetrics(None)
val DEFAULT_SUMMER_STORM_METRICS = SummerStormMetrics(None)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.twitter.summingbird.storm

import org.apache.storm.topology.BoltDeclarer
import org.apache.storm.tuple.{ Fields => StormFields }

/**
* This trait is used to represent different grouping strategies in `Storm`.
*/
sealed trait EdgeGrouping {
/**
* How to apply this `EdgeGrouping` to edge between `parentName` node and bolt declared by `declarer`.
*/
def apply(declarer: BoltDeclarer, parentName: String): Unit
}

object EdgeGrouping {
case object Shuffle extends EdgeGrouping {
override def apply(declarer: BoltDeclarer, parentName: String): Unit =
declarer.shuffleGrouping(parentName)
}
case object LocalOrShuffle extends EdgeGrouping {
override def apply(declarer: BoltDeclarer, parentName: String): Unit =
declarer.localOrShuffleGrouping(parentName)
}
case class Fields(fields: StormFields) extends EdgeGrouping {
override def apply(declarer: BoltDeclarer, parentName: String): Unit =
declarer.fieldsGrouping(parentName, fields)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.twitter.summingbird.storm

import com.twitter.bijection.{ Injection, Inversion }
import org.apache.storm.tuple.Fields
import java.util.{ ArrayList => JAList, List => JList }

import com.twitter.summingbird.online.executor.KeyValueShards

import scala.collection.{ Map => CMap }
import scala.util.Try

/**
* This trait represents an edge in Storm topology DAG between two nodes.
* @tparam T represents type of tuples sent over this `Edge`.
*/
sealed trait EdgeType[T] {
/**
* Storm's `Fields` are going to be sent over this `Edge`.
*/
val fields: Fields

/**
* Injection from values (which are going to be sent) to Storm's values.
* Each element in returned array corresponds to element in `Fields` at the same index.
*/
val injection: Injection[T, JList[AnyRef]]

/**
* Grouping for this `Edge`.
*/
val grouping: EdgeGrouping
}

object EdgeType {
/**
* Simplest possible type of `Edge`, without any assumptions about content inside.
*/
case class Item[T] private[storm] (edgeGrouping: EdgeGrouping) extends EdgeType[T] {
override val fields: Fields = new Fields("value")
override val injection: Injection[T, JList[AnyRef]] = EdgeTypeInjections.forItem
override val grouping: EdgeGrouping = edgeGrouping
}

/**
* This `Edge` type used for aggregated key value pairs emitted by partial aggregation.
* @param shards is a number which was used for partial aggregation.
*/
case class AggregatedKeyValues[K, V](shards: KeyValueShards) extends EdgeType[(Int, CMap[K, V])] {
override val fields: Fields = new Fields("aggKey", "aggValue")
override val injection: Injection[(Int, CMap[K, V]), JList[AnyRef]] = EdgeTypeInjections.forKeyValue
override val grouping: EdgeGrouping = EdgeGrouping.Fields(new Fields("aggKey"))
}

def itemWithShuffleGrouping[T]: Item[T] = Item[T](EdgeGrouping.Shuffle)
def itemWithLocalOrShuffleGrouping[T]: Item[T] = Item[T](EdgeGrouping.LocalOrShuffle)
}

private object EdgeTypeInjections {
def forItem[T]: Injection[T, JList[AnyRef]] = new Injection[T, JList[AnyRef]] {
override def apply(tuple: T): JAList[AnyRef] = {
val list = new JAList[AnyRef](1)
list.add(tuple.asInstanceOf[AnyRef])
list
}

override def invert(valueIn: JList[AnyRef]): Try[T] = Inversion.attempt(valueIn) { input =>
input.get(0).asInstanceOf[T]
}
}

def forKeyValue[K, V]: Injection[(K, V), JList[AnyRef]] = new Injection[(K, V), JList[AnyRef]] {
override def apply(tuple: (K, V)): JAList[AnyRef] = {
val (key, value) = tuple
val list = new JAList[AnyRef](2)
list.add(key.asInstanceOf[AnyRef])
list.add(value.asInstanceOf[AnyRef])
list
}

override def invert(valueIn: JList[AnyRef]): Try[(K, V)] = Inversion.attempt(valueIn) { input =>
val key = input.get(0).asInstanceOf[K]
val value = input.get(1).asInstanceOf[V]
(key, value)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import com.twitter.summingbird.online.executor
import com.twitter.summingbird.online.FlatMapOperation
import com.twitter.summingbird.storm.planner._
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
import org.slf4j.LoggerFactory
import scala.collection.{ Map => CMap }

/**
* These are helper functions for building a bolt from a Node[Storm] element.
Expand Down Expand Up @@ -83,24 +81,21 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
private val maxEmitPerExecute = getOrElse(DEFAULT_MAX_EMIT_PER_EXECUTE)
logger.info(s"[$nodeName] maxEmitPerExecute : ${maxEmitPerExecute.get}")

private val usePreferLocalDependency = getOrElse(DEFAULT_FM_PREFER_LOCAL_DEPENDENCY)
logger.info(s"[$nodeName] usePreferLocalDependency: ${usePreferLocalDependency.get}")

private def getFFMBolt[T, K, V](summer: SummerNode[Storm]) = {
type ExecutorInput = (Timestamp, T)
type ExecutorKey = Int
type InnerValue = (Timestamp, V)
type ExecutorValue = CMap[(K, BatchID), InnerValue]
type Input = (Timestamp, T)
type OutputKey = (K, BatchID)
type OutputValue = (Timestamp, V)
val summerProducer = summer.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, K, V]]
// When emitting tuples between the Final Flat Map and the summer we encode the timestamp in the value
// The monoid we use in aggregation is timestamp max.
val batcher = summerProducer.store.mergeableBatcher
implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup

// Query to get the summer paralellism of the summer down stream of us we are emitting to
// to ensure no edge case between what we might see for its parallelism and what it would see/pass to storm.
val summerParalellism = getOrElse(DEFAULT_SUMMER_PARALLELISM, summer)
val summerBatchMultiplier = getOrElse(DEFAULT_SUMMER_BATCH_MULTIPLIER, summer)

// This option we report its value here, but its not user settable.
val keyValueShards = executor.KeyValueShards(summerParalellism.parHint * summerBatchMultiplier.get)
val keyValueShards = storm.getSummerKeyValueShards(stormDag, summer)
logger.info(s"[$nodeName] keyValueShards : ${keyValueShards.get}")

val operation = foldOperations[T, (K, V)](node.members.reverse)
Expand All @@ -113,19 +108,18 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
metrics.metrics,
anchorTuples,
true,
new Fields(AGG_KEY, AGG_VALUE),
ackOnEntry,
maxExecutePerSec,
new SingleItemInjection[ExecutorInput],
new KeyValueInjection[ExecutorKey, ExecutorValue],
inputEdgeTypes[Input],
EdgeType.AggregatedKeyValues[OutputKey, OutputValue](keyValueShards),
new executor.FinalFlatMap(
wrappedOperation,
builder,
maxWaiting,
maxWaitTime,
maxEmitPerExecute,
keyValueShards
)(implicitly[Semigroup[InnerValue]])
)(implicitly[Semigroup[OutputValue]])
)
}

Expand All @@ -141,11 +135,11 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
metrics.metrics,
anchorTuples,
stormDag.dependantsOf(node).size > 0,
new Fields(VALUE_FIELD),
ackOnEntry,
maxExecutePerSec,
new SingleItemInjection[ExecutorInput],
new SingleItemInjection[ExecutorOutput],
inputEdgeTypes[ExecutorInput],
// Output edge's grouping isn't important for now.
EdgeType.itemWithLocalOrShuffleGrouping[ExecutorOutput],
new executor.IntermediateFlatMap(
wrappedOperation,
maxWaiting,
Expand All @@ -162,4 +156,15 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
case None => getIntermediateFMBolt[Any, Any].asInstanceOf[BaseBolt[Any, Any]]
}
}

private def inputEdgeTypes[Input]: Map[String, EdgeType[Input]] = {
val edge = if (usePreferLocalDependency.get) {
EdgeType.itemWithLocalOrShuffleGrouping[Input]
} else {
EdgeType.itemWithShuffleGrouping[Input]
}

val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) }
dependenciesNames.map((_, edge)).toMap
}
}
Loading