Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Refactor Storm platform to introduce Edges #728

Merged
merged 7 commits into from
Jun 8, 2017

Conversation

ttim
Copy link
Collaborator

@ttim ttim commented Jun 5, 2017

In order to introduce grouped leftJoin I would like to make tuples we send between nodes to be more precise. In particular I want to send (K, V) tuples as tuples with key and value fields.

In this PR I did a small refactoring to introduce Edge trait which correspond to edges in Storms topology DAG - it contains a way how to serialize/deserialize data into Fields and how to group data over this edge.

Copy link
Contributor

@pankajroark pankajroark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick first pass is all about comments. Can you explain the reason for the change as well, i.e. how it helps with implementing grouped join.

import scala.collection.{ Map => CMap }
import scala.util.Try

sealed trait Edge[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc comments for trait and every public field and method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ackOnEntry: AckOnEntry,
maxExecutePerSec: MaxExecutePerSecond,
decoder: Injection[I, JList[AnyRef]],
encoder: Injection[O, JList[AnyRef]],
inputEdges: Map[String, Edge[I]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs for all the params, I know existing code doesn't have any, but it's a good time to add them. What does the string represent. Why are there multiple incoming edges but only one outgoing edge?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the comment looks very useful.

@@ -192,6 +192,12 @@ case class BaseBolt[I, O](jobID: JobId,
executor.cleanup
}

def applyGroupings(declarer: BoltDeclarer): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@ttim
Copy link
Collaborator Author

ttim commented Jun 5, 2017

Currently when we send tuples over the wire we send them only in two formats: as an item (then it's just one value Field) or as something partially aggregated (aggKey and aggValue with (Int, Map[K, V]) as a content).

This breaks if we want to implement grouped join because we want to have fields we are able to group by (therefore first format is out) but also we don't want to have things partially aggregated (therefore second is out).

Also right now we have runtime issues in topologies where we do flatMap and sumByKey in parallel branches from same flatMap node (see #725). This happens because of the same reason - flatMap node should emit aggregated and non aggregated values at the same time.

I can see three ways to fix this issues:

  1. simplest - send all key value pairs in aggregated format, even if it's not suppose to be aggregated.
    Pros: all bolts have simple contract with one input and output format, simplest implementation. Cons: on each (K, V) pair we will have some size overhead (shardId Int, two tuple objects) and some performance overhead (but I think it's not really substantial)
  2. medium - all bolts send only one type of tuples (regardless to downstream), but on receive side you can distinguish based on input component. We need second part for next use case: imagine you have flatMap which emits to both sumByKey and another flatMap. In this case the only way to make topology correct is to emit single element aggregated (K, V) on source flatMap, with treating them as (K, V) on flatMap downstream node and as aggregated (K, V) on sumByKey downstream node.
    Pros: contract is more or less simple, no size runtime overhead for cases which works today
    Cons: still overhead in some cases, not that simple to implement (I chase this one, that's why I have Map[String, Edge[]] as an input type for Bolt)
  3. most complex and precise - implement different strategies which depends on both sending and receiving sides.
    Pros: smallest possible runtime overhead (especially if we will implement customizable OperationContainer which will be able to do and not to do partial aggregation at the same time on the same node based on downstream)
    Cons: involves biggest change, especially in BaseBolt to support customized emits based on target componentId

Which one is the best, what do you think @johnynek @pankajroark ?


sealed trait EdgeGrouping {
def apply(declarer: BoltDeclarer, parentName: String): Unit
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put this in a separate file: EdgeGrouping.scala?

And can we use the style to put inner classes to not clutter the name space:

object EdgeGrouping {
  case object Shuffle extends EdgeGrouping
  case object LocalOrShuffle extends EdgeGrouping 
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, looks way better!

declarer.fieldsGrouping(parentName, fields)
}

case class ItemEdge[T] private (edgeGrouping: EdgeGrouping) extends Edge[T] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, can we put instances of Edge in the object:

object Edge {
  case class Item[T]...
  case class AggregatedKeyValues[K, V]...
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -162,4 +156,15 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
case None => getIntermediateFMBolt[Any, Any].asInstanceOf[BaseBolt[Any, Any]]
}
}

private def inputEdges[Input](): Map[String, Edge[Input]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style is to use unary methods foo(): when there is a side effect. These don't have that so you should do:

def inputEdges[Input]: Map[String, Edge[Input]] =

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

def forKeyValue[K, V](): Injection[(K, V), JList[AnyRef]] = new Injection[(K, V), JList[AnyRef]] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just def forKeyValue[K, V]: Injection...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

object EdgeInjections {
def forItem[T](): Injection[T, JList[AnyRef]] = new Injection[T, JList[AnyRef]] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just forItem[T]: Injection... no ()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@ttim
Copy link
Collaborator Author

ttim commented Jun 6, 2017

@pankajroark added docs and explained motivation above.

I think it makes sense to use second approach for now, because I realized we need some customization of input side anyway - for flatMap node doing emits to both flatMap and sumByKey for one case we want to use shuffle grouping and for another we want to use fields grouping.

@codecov-io
Copy link

codecov-io commented Jun 6, 2017

Codecov Report

Merging #728 into develop will increase coverage by 0.02%.
The diff coverage is 93.75%.

Impacted file tree graph

@@            Coverage Diff             @@
##           develop    #728      +/-   ##
==========================================
+ Coverage    71.38%   71.4%   +0.02%     
==========================================
  Files          141     142       +1     
  Lines         3491    3504      +13     
  Branches       195     197       +2     
==========================================
+ Hits          2492    2502      +10     
- Misses         999    1002       +3
Impacted Files Coverage Δ
...cala/com/twitter/summingbird/storm/Constants.scala 100% <ø> (ø) ⬆️
.../com/twitter/summingbird/storm/StormPlatform.scala 82.9% <100%> (+0.55%) ⬆️
...witter/summingbird/storm/spout/KeyValueSpout.scala 78.94% <100%> (+1.16%) ⬆️
...scala/com/twitter/summingbird/storm/EdgeType.scala 100% <100%> (ø)
...a/com/twitter/summingbird/storm/EdgeGrouping.scala 66.66% <66.66%> (ø)
...scala/com/twitter/summingbird/storm/BaseBolt.scala 77.01% <75%> (-0.1%) ⬇️
...witter/summingbird/storm/FlatMapBoltProvider.scala 98.55% <92.3%> (-1.45%) ⬇️
.../main/scala/com/twitter/summingbird/Producer.scala 75.75% <0%> (-1.52%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 731da7f...50ac628. Read the comment docs.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really positive about this change. Just a couple of questions for clarification.

new SingleItemInjection[ExecutorInput],
new SingleItemInjection[ExecutorOutput],
inputEdges[ExecutorInput],
// Output edge's grouping isn't important for now.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment why it odes not matter what the output grouping is?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It explained in declaration site, but what is more important - I expect that to change in subsequent review.

@@ -162,4 +156,15 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
case None => getIntermediateFMBolt[Any, Any].asInstanceOf[BaseBolt[Any, Any]]
}
}

private def inputEdges[Input]: Map[String, Edge[Input]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this should be passed in on construction from the nodes it depends on. Is that something we will get to? A win, seems to me, would be we share the same Edge instances between the output at one level and then input at the next, and it could reduce the chance for error there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I want to make this logic in a way you said - use same Edge instances for input and output with double check on edges compatibility at topology construction time (that's why I left shardsCount for example in grouping declaration).

new KeyValueInjection[Int, CMap[ExecutorKeyType, ExecutorValueType]],
new SingleItemInjection[ExecutorOutputType],
inputEdges,
// Output edge's grouping isn't important for now.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@pankajroark
Copy link
Contributor

On strategies: the second strategy, the one you're pursuing, does seem like the right one to me. I'm still reviewing the code.

Copy link
Contributor

@pankajroark pankajroark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks like a great refactoring that simplifies code. I really like that we pin down Edges and EdgeGrouping as concrete concepts.

* false otherwise.
* @param hasDependants does this node have any downstream nodes?
* @param ackOnEntry ack tuples in the beginning of processing.
* @param maxExecutePerSec limits number of executes per second, will block processing thread after.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add: "Used for rate limiting."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param ackOnEntry ack tuples in the beginning of processing.
* @param maxExecutePerSec limits number of executes per second, will block processing thread after.
* @param inputEdges is a map from name of downstream node to `Edge` from it.
* @param outputEdge is an edge from this node. To be precise there are number of output edges,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this design somewhere. The package object is a common place for documenting the desing.

Copy link
Collaborator Author

@ttim ttim Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it until subsequent PR, there will be clearer place for this later.

ackOnEntry: AckOnEntry,
maxExecutePerSec: MaxExecutePerSecond,
decoder: Injection[I, JList[AnyRef]],
encoder: Injection[O, JList[AnyRef]],
inputEdges: Map[String, Edge[I]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the comment looks very useful.


def forKeyValue[K, V]: Injection[(K, V), JList[AnyRef]] = new Injection[(K, V), JList[AnyRef]] {
override def apply(item: (K, V)): JAList[AnyRef] = {
val (key, v) = item
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why key expanded and not value? Better to be consistent.

* This trait is used to represent different grouping strategies in `Storm`.
*/
sealed trait EdgeGrouping {
def apply(declarer: BoltDeclarer, parentName: String): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment what this does? Like what do the implementors of this trait need to do to confirm with the protocol.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@pankajroark
Copy link
Contributor

lgtm

@ttim
Copy link
Collaborator Author

ttim commented Jun 7, 2017

@johnynek are you good with this change? I'm preparing next review where I did this outputEdge thing unnecessary.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shipit!

@ttim ttim merged commit 6f7fcdd into twitter:develop Jun 8, 2017
@ttim ttim deleted the introduce_edges branch June 8, 2017 05:04
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants