Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Refactor Storm platform to introduce Edges #728

Merged
merged 7 commits into from
Jun 8, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ object InjectionLaws extends Properties("InjectionTests") {
implicit def ts: Arbitrary[Timestamp] =
Arbitrary(Arbitrary.arbitrary[Long].map(Timestamp(_)))

property("Single injection works") = forAll { in: String =>
val inj = new SingleItemInjection[String]
property("Item injection works") = forAll { in: String =>
val inj = EdgeInjections.forItem[String]
inj.invert(inj(in)).get == in
}
property("KV injection works") = forAll { in: (String, List[Int]) =>
val inj = new KeyValueInjection[String, List[Int]]
val inj = EdgeInjections.forKeyValue[String, List[Int]]
inj.invert(inj(in)).get == in
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package com.twitter.summingbird.storm

import scala.util.{ Failure, Success }
import com.twitter.bijection.Injection
import java.util.{ Map => JMap }
import com.twitter.summingbird.storm.option.{ AckOnEntry, AnchorTuples, MaxExecutePerSecond }
import com.twitter.summingbird.online.executor.OperationContainer
Expand All @@ -27,28 +26,38 @@ import com.twitter.summingbird.{ Group, JobCounters, Name, SummingbirdRuntimeSta
import com.twitter.summingbird.online.Externalizer
import chain.Chain
import scala.collection.JavaConverters._
import java.util.{ List => JList }
import org.apache.storm.task.{ OutputCollector, TopologyContext }
import org.apache.storm.topology.{ IRichBolt, OutputFieldsDeclarer }
import org.apache.storm.tuple.{ Fields, Tuple, TupleImpl }
import org.apache.storm.topology.{ BoltDeclarer, IRichBolt, OutputFieldsDeclarer }
import org.apache.storm.tuple.{ Tuple, TupleImpl }
import org.slf4j.{ Logger, LoggerFactory }

/**
*
* @author Oscar Boykin
* @author Ian O Connell
* @author Sam Ritchie
* @author Ashu Singhal
*/
* This class is used as an implementation for Storm's `Bolt`s.
*
* @param jobID is an id for current topology, used for metrics.
* @param metrics represents metrics we want to collect on this node.
* @param anchorTuples should be equal to true if you want to utilize Storm's anchoring of tuples,
* false otherwise.
* @param hasDependants does this node have any downstream nodes?
* @param ackOnEntry ack tuples in the beginning of processing.
* @param maxExecutePerSec limits number of executes per second, will block processing thread after.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add: "Used for rate limiting."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param inputEdges is a map from name of downstream node to `Edge` from it.
* @param outputEdge is an edge from this node. To be precise there are number of output edges,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this design somewhere. The package object is a common place for documenting the desing.

Copy link
Collaborator Author

@ttim ttim Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it until subsequent PR, there will be clearer place for this later.

* but we expect them to have same format and we don't use their grouping information here,
* therefore it's ok to have only one instance of `Edge` here.
* @param executor is `OperationContainer` which represents operation for this `Bolt`,
* for example it can be summing or flat mapping.
* @tparam I is a type of input tuples for this `Bolt`s executor.
* @tparam O is a type of output tuples for this `Bolt`s executor.
*/
case class BaseBolt[I, O](jobID: JobId,
metrics: () => TraversableOnce[StormMetric[_]],
anchorTuples: AnchorTuples,
hasDependants: Boolean,
outputFields: Fields,
ackOnEntry: AckOnEntry,
maxExecutePerSec: MaxExecutePerSecond,
decoder: Injection[I, JList[AnyRef]],
encoder: Injection[O, JList[AnyRef]],
inputEdges: Map[String, Edge[I]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs for all the params, I know existing code doesn't have any, but it's a good time to add them. What does the string represent. Why are there multiple incoming edges but only one outgoing edge?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the comment looks very useful.

outputEdge: Edge[O],
executor: OperationContainer[I, O, InputState[Tuple]]) extends IRichBolt {

@transient protected lazy val logger: Logger = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -130,7 +139,10 @@ case class BaseBolt[I, O](jobID: JobId,
* System ticks come with a fixed stream id
*/
val curResults = if (!tuple.getSourceStreamId.equals("__tick")) {
val tsIn = decoder.invert(tuple.getValues).get // Failing to decode here is an ERROR
val tsIn = inputEdges.get(tuple.getSourceComponent) match {
case Some(inputEdge) => inputEdge.injection.invert(tuple.getValues).get
case None => throw new Exception("Unrecognized source component: " + tuple.getSourceComponent)
}
// Don't hold on to the input values
clearValues(tuple)
if (earlyAck) { collector.ack(tuple) }
Expand All @@ -155,12 +167,12 @@ case class BaseBolt[I, O](jobID: JobId,
if (anchorTuples.anchor) {
val states = inputs.iterator.map(_.state).toList.asJava
results.foreach { result =>
collector.emit(states, encoder(result))
collector.emit(states, outputEdge.injection(result))
emitCount += 1
}
} else { // don't anchor
results.foreach { result =>
collector.emit(encoder(result))
collector.emit(outputEdge.injection(result))
emitCount += 1
}
}
Expand All @@ -183,7 +195,7 @@ case class BaseBolt[I, O](jobID: JobId,
}

override def declareOutputFields(declarer: OutputFieldsDeclarer) {
if (hasDependants) { declarer.declare(outputFields) }
if (hasDependants) { declarer.declare(outputEdge.fields) }
}

override val getComponentConfiguration = null
Expand All @@ -192,6 +204,15 @@ case class BaseBolt[I, O](jobID: JobId,
executor.cleanup
}

/**
* Apply groupings defined by input edges of this `Bolt` to Storm's topology.
*/
def applyGroupings(declarer: BoltDeclarer): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

inputEdges.foreach { case (parentName, inputEdge) =>
inputEdge.grouping.apply(declarer, parentName)
}
}

/**
* This is clearly not safe, but done to deal with GC issues since
* storm keeps references to values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ import com.twitter.summingbird.online.OnlineDefaultConstants
* Here we can override ones from online, or add more that are more Storm specific
*/
object Constants extends OnlineDefaultConstants {
val AGG_KEY = "aggKey"
val AGG_VALUE = "aggValue"
val AGG_BATCH = "aggBatchID"
val RETURN_INFO = "return-info"

val VALUE_FIELD = "value"
val GROUP_BY_SUM = "groupBySum"

val DEFAULT_SPOUT_STORM_METRICS = SpoutStormMetrics(None)
val DEFAULT_FM_STORM_METRICS = FlatMapStormMetrics(None)
val DEFAULT_SUMMER_STORM_METRICS = SummerStormMetrics(None)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.twitter.summingbird.storm

import com.twitter.bijection.{ Injection, Inversion }
import org.apache.storm.tuple.Fields
import java.util.{ ArrayList => JAList, List => JList }

import com.twitter.summingbird.online.executor.KeyValueShards

import scala.collection.{ Map => CMap }
import scala.util.Try

/**
* This trait represents an edge in Storm topology DAG between two nodes.
* @tparam T represents type of tuples sent over this `Edge`.
*/
sealed trait Edge[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc comments for trait and every public field and method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Storm's `Fields` are going to be sent over this `Edge`.
*/
val fields: Fields

/**
* Injection from values (which are going to be sent) to Storm's values.
* Each element in returned array corresponds to element in `Fields` at the same index.
*/
val injection: Injection[T, JList[AnyRef]]

/**
* Grouping for this `Edge`.
*/
val grouping: EdgeGrouping
}

object Edge {
/**
* Simplest possible type of `Edge`, without any assumptions about content inside.
*/
case class Item[T] private[storm] (edgeGrouping: EdgeGrouping) extends Edge[T] {
override val fields: Fields = new Fields("value")
override val injection: Injection[T, JList[AnyRef]] = EdgeInjections.forItem
override val grouping: EdgeGrouping = edgeGrouping
}

/**
* This `Edge` type used for aggregated key value pairs emitted by partial aggregation.
* @param shards is a number which was used for partial aggregation.
*/
case class AggregatedKeyValues[K, V](shards: KeyValueShards) extends Edge[(Int, CMap[K, V])] {
override val fields: Fields = new Fields("aggKey", "aggValue")
override val injection: Injection[(Int, CMap[K, V]), JList[AnyRef]] = EdgeInjections.forKeyValue
override val grouping: EdgeGrouping = EdgeGrouping.Fields(new Fields("aggKey"))
}

def itemWithShuffleGrouping[T]: Item[T] = Item[T](EdgeGrouping.Shuffle)
def itemWithLocalOrShuffleGrouping[T]: Item[T] = Item[T](EdgeGrouping.LocalOrShuffle)
}

private object EdgeInjections {
def forItem[T]: Injection[T, JList[AnyRef]] = new Injection[T, JList[AnyRef]] {
override def apply(t: T): JAList[AnyRef] = {
val list = new JAList[AnyRef](1)
list.add(t.asInstanceOf[AnyRef])
list
}

override def invert(vin: JList[AnyRef]): Try[T] = Inversion.attempt(vin) { v =>
v.get(0).asInstanceOf[T]
}
}

def forKeyValue[K, V]: Injection[(K, V), JList[AnyRef]] = new Injection[(K, V), JList[AnyRef]] {
override def apply(item: (K, V)): JAList[AnyRef] = {
val (key, v) = item
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why key expanded and not value? Better to be consistent.

val list = new JAList[AnyRef](2)
list.add(key.asInstanceOf[AnyRef])
list.add(v.asInstanceOf[AnyRef])
list
}

override def invert(vin: JList[AnyRef]): Try[(K, V)] = Inversion.attempt(vin) { v =>
val key = v.get(0).asInstanceOf[K]
val value = v.get(1).asInstanceOf[V]
(key, value)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.twitter.summingbird.storm

import org.apache.storm.topology.BoltDeclarer
import org.apache.storm.tuple.{ Fields => StormFields }

/**
* This trait is used to represent different grouping strategies in `Storm`.
*/
sealed trait EdgeGrouping {
def apply(declarer: BoltDeclarer, parentName: String): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment what this does? Like what do the implementors of this trait need to do to confirm with the protocol.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}

object EdgeGrouping {
case object Shuffle extends EdgeGrouping {
override def apply(declarer: BoltDeclarer, parentName: String): Unit =
declarer.shuffleGrouping(parentName)
}
case object LocalOrShuffle extends EdgeGrouping {
override def apply(declarer: BoltDeclarer, parentName: String): Unit =
declarer.localOrShuffleGrouping(parentName)
}
case class Fields(fields: StormFields) extends EdgeGrouping {
override def apply(declarer: BoltDeclarer, parentName: String): Unit =
declarer.fieldsGrouping(parentName, fields)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import com.twitter.summingbird.online.executor
import com.twitter.summingbird.online.FlatMapOperation
import com.twitter.summingbird.storm.planner._
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
import org.slf4j.LoggerFactory
import scala.collection.{ Map => CMap }

/**
* These are helper functions for building a bolt from a Node[Storm] element.
Expand Down Expand Up @@ -83,24 +81,21 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
private val maxEmitPerExecute = getOrElse(DEFAULT_MAX_EMIT_PER_EXECUTE)
logger.info(s"[$nodeName] maxEmitPerExecute : ${maxEmitPerExecute.get}")

private val usePreferLocalDependency = getOrElse(DEFAULT_FM_PREFER_LOCAL_DEPENDENCY)
logger.info(s"[$nodeName] usePreferLocalDependency: ${usePreferLocalDependency.get}")

private def getFFMBolt[T, K, V](summer: SummerNode[Storm]) = {
type ExecutorInput = (Timestamp, T)
type ExecutorKey = Int
type InnerValue = (Timestamp, V)
type ExecutorValue = CMap[(K, BatchID), InnerValue]
type Input = (Timestamp, T)
type OutputKey = (K, BatchID)
type OutputValue = (Timestamp, V)
val summerProducer = summer.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, K, V]]
// When emitting tuples between the Final Flat Map and the summer we encode the timestamp in the value
// The monoid we use in aggregation is timestamp max.
val batcher = summerProducer.store.mergeableBatcher
implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup

// Query to get the summer paralellism of the summer down stream of us we are emitting to
// to ensure no edge case between what we might see for its parallelism and what it would see/pass to storm.
val summerParalellism = getOrElse(DEFAULT_SUMMER_PARALLELISM, summer)
val summerBatchMultiplier = getOrElse(DEFAULT_SUMMER_BATCH_MULTIPLIER, summer)

// This option we report its value here, but its not user settable.
val keyValueShards = executor.KeyValueShards(summerParalellism.parHint * summerBatchMultiplier.get)
val keyValueShards = storm.getSummerKeyValueShards(stormDag, summer)
logger.info(s"[$nodeName] keyValueShards : ${keyValueShards.get}")

val operation = foldOperations[T, (K, V)](node.members.reverse)
Expand All @@ -113,19 +108,18 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
metrics.metrics,
anchorTuples,
true,
new Fields(AGG_KEY, AGG_VALUE),
ackOnEntry,
maxExecutePerSec,
new SingleItemInjection[ExecutorInput],
new KeyValueInjection[ExecutorKey, ExecutorValue],
inputEdges[Input],
Edge.AggregatedKeyValues[OutputKey, OutputValue](keyValueShards),
new executor.FinalFlatMap(
wrappedOperation,
builder,
maxWaiting,
maxWaitTime,
maxEmitPerExecute,
keyValueShards
)(implicitly[Semigroup[InnerValue]])
)(implicitly[Semigroup[OutputValue]])
)
}

Expand All @@ -141,11 +135,11 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
metrics.metrics,
anchorTuples,
stormDag.dependantsOf(node).size > 0,
new Fields(VALUE_FIELD),
ackOnEntry,
maxExecutePerSec,
new SingleItemInjection[ExecutorInput],
new SingleItemInjection[ExecutorOutput],
inputEdges[ExecutorInput],
// Output edge's grouping isn't important for now.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment why it odes not matter what the output grouping is?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It explained in declaration site, but what is more important - I expect that to change in subsequent review.

Edge.itemWithLocalOrShuffleGrouping[ExecutorOutput],
new executor.IntermediateFlatMap(
wrappedOperation,
maxWaiting,
Expand All @@ -162,4 +156,15 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
case None => getIntermediateFMBolt[Any, Any].asInstanceOf[BaseBolt[Any, Any]]
}
}

private def inputEdges[Input]: Map[String, Edge[Input]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this should be passed in on construction from the nodes it depends on. Is that something we will get to? A win, seems to me, would be we share the same Edge instances between the output at one level and then input at the next, and it could reduce the chance for error there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I want to make this logic in a way you said - use same Edge instances for input and output with double check on edges compatibility at topology construction time (that's why I left shardsCount for example in grouping declaration).

val edge = if (usePreferLocalDependency.get) {
Edge.itemWithLocalOrShuffleGrouping[Input]
} else {
Edge.itemWithShuffleGrouping[Input]
}

val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) }
dependenciesNames.map((_, edge)).toMap
}
}
Loading