Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #495 from twitter/feature/stat_store
Browse files Browse the repository at this point in the history
Feature/stat store
  • Loading branch information
jcoveney committed Apr 16, 2014
2 parents dfd49bb + 6b5813e commit f64ec6b
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 24 deletions.
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object SummingbirdBuild extends Build {
val dfsDatastoresVersion = "1.3.4"
val bijectionVersion = "0.6.2"
val algebirdVersion = "0.5.0"
val scaldingVersion = "0.9.0rc17"
val scaldingVersion = "0.9.1"
val storehausVersion = "0.9.0"
val utilVersion = "6.3.8"
val chillVersion = "0.3.6"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.{Try, Success, Failure}
import java.util.concurrent.TimeoutException
import org.slf4j.{LoggerFactory, Logger}

abstract class AsyncBase[I,O,S,D](maxWaitingFutures: MaxWaitingFutures, maxWaitingTime: MaxFutureWaitTime, maxEmitPerExec: MaxEmitPerExecute) extends Serializable with OperationContainer[I,O,S,D] {
abstract class AsyncBase[I, O, S, D, RC](maxWaitingFutures: MaxWaitingFutures, maxWaitingTime: MaxFutureWaitTime, maxEmitPerExec: MaxEmitPerExecute) extends Serializable with OperationContainer[I,O,S,D, RC] {

@transient protected lazy val logger: Logger = LoggerFactory.getLogger(getClass)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[summingbird] case class KeyValueShards(get: Int) {
def summerIdFor[K](k: K): Int = k.hashCode % get
}

class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_], D](
class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_], D, RC](
@transient flatMapOp: FlatMapOperation[Event, (Key, Value)],
cacheBuilder: CacheBuilder[Int, (List[S], Map[Key, Value])],
maxWaitingFutures: MaxWaitingFutures,
Expand All @@ -56,7 +56,7 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_], D](
pDecoder: Injection[Event, D],
pEncoder: Injection[(Int, Map[Key, Value]), D]
)
extends AsyncBase[Event, (Int, Map[Key, Value]), S, D](maxWaitingFutures,
extends AsyncBase[Event, (Int, Map[Key, Value]), S, D, RC](maxWaitingFutures,
maxWaitingTime,
maxEmitPerExec) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import com.twitter.summingbird.online.option.{
}


class IntermediateFlatMap[T,U,S,D](
class IntermediateFlatMap[T, U, S, D, RC](
@transient flatMapOp: FlatMapOperation[T, U],
maxWaitingFutures: MaxWaitingFutures,
maxWaitingTime: MaxFutureWaitTime,
maxEmitPerExec: MaxEmitPerExecute,
pDecoder: Injection[T, D],
pEncoder: Injection[U, D]
) extends AsyncBase[T, U, S, D](maxWaitingFutures, maxWaitingTime, maxEmitPerExec) {
) extends AsyncBase[T, U, S, D, RC](maxWaitingFutures, maxWaitingTime, maxEmitPerExec) {

val encoder = pEncoder
val decoder = pDecoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package com.twitter.summingbird.online.executor
import scala.util.Try
import com.twitter.bijection.Injection

trait OperationContainer[Input, Output, State, WireFmt] {
trait OperationContainer[Input, Output, State, WireFmt, RuntimeContext] {
def decoder: Injection[Input, WireFmt]
def encoder: Injection[Output, WireFmt]
def executeTick: TraversableOnce[(List[State], Try[TraversableOnce[Output]])]
def execute(state: State,
data: Input):
TraversableOnce[(List[State], Try[TraversableOnce[Output]])]
def init {}
def init(ctx: RuntimeContext) {}
def cleanup {}
def notifyFailure(inputs: List[State], e: Throwable) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.twitter.summingbird.online.executor

import com.twitter.util.{Await, Future}
import com.twitter.util.{Await, Future, Promise}
import com.twitter.algebird.{Semigroup, SummingQueue}
import com.twitter.storehaus.algebra.Mergeable
import com.twitter.bijection.Injection
Expand All @@ -43,12 +43,13 @@ import com.twitter.summingbird.option.CacheSize
* (MaxWaitingFutures * execute latency).
*
* @author Oscar Boykin
* @author Ian O Connell
* @author Sam Ritchie
* @author Ashu Singhal
*/

class Summer[Key, Value: Semigroup, Event, S, D](
@transient storeSupplier: () => Mergeable[Key, Value],
class Summer[Key, Value: Semigroup, Event, S, D, RC](
@transient storeSupplier: RC => Mergeable[Key, Value],
@transient flatMapOp: FlatMapOperation[(Key, (Option[Value], Value)), Event],
@transient successHandler: OnlineSuccessHandler,
@transient exceptionHandler: OnlineExceptionHandler,
Expand All @@ -59,7 +60,7 @@ class Summer[Key, Value: Semigroup, Event, S, D](
includeSuccessHandler: IncludeSuccessHandler,
pDecoder: Injection[(Int, Map[Key, Value]), D],
pEncoder: Injection[Event, D]) extends
AsyncBase[(Int, Map[Key, Value]), Event, InputState[S], D](
AsyncBase[(Int, Map[Key, Value]), Event, InputState[S], D, RC](
maxWaitingFutures,
maxWaitingTime,
maxEmitPerExec) {
Expand All @@ -69,17 +70,20 @@ class Summer[Key, Value: Semigroup, Event, S, D](
val decoder = pDecoder

val storeBox = Externalizer(storeSupplier)
lazy val store = storeBox.get.apply
lazy val storePromise = Promise[Mergeable[Key, Value]]
lazy val store = Await.result(storePromise)

lazy val sCache: AsyncCache[Key, (List[InputState[S]], Value)] = cacheBuilder(implicitly[Semigroup[(List[InputState[S]], Value)]])

val exceptionHandlerBox = Externalizer(exceptionHandler.handlerFn.lift)
val successHandlerBox = Externalizer(successHandler)
var successHandlerOpt: Option[OnlineSuccessHandler] = null

override def init {
super.init
override def init(runtimeContext: RC) {
super.init(runtimeContext)
storePromise.setValue(storeBox.get(runtimeContext))
store.toString // Do the lazy evaluation now so we can connect before tuples arrive.

successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class BaseBolt[I,O](metrics: () => TraversableOnce[StormMetric[_]],
hasDependants: Boolean,
outputFields: Fields,
ackOnEntry: AckOnEntry,
executor: OperationContainer[I, O, InputState[Tuple], JList[AnyRef]]
executor: OperationContainer[I, O, InputState[Tuple], JList[AnyRef], TopologyContext]
) extends IRichBolt {


Expand Down Expand Up @@ -119,7 +119,7 @@ case class BaseBolt[I,O](metrics: () => TraversableOnce[StormMetric[_]],
override def prepare(conf: JMap[_,_], context: TopologyContext, oc: OutputCollector) {
collector = oc
metrics().foreach { _.register(context) }
executor.init
executor.init(context)
}

override def declareOutputFields(declarer: OutputFieldsDeclarer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright 2013 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.storm

import backtype.storm.task.TopologyContext
import backtype.storm.metric.api.CountMetric
import org.slf4j.LoggerFactory
import com.twitter.storehaus.algebra.reporting.{StoreReporter, MergeableReporter}
import com.twitter.storehaus.algebra.{Mergeable, MergeableProxy}
import com.twitter.storehaus.{Store, StoreProxy}
import com.twitter.util.{Promise, Future}

/**
*
* @author Ian O Connell
*/

class MergeableStatReporter[K, V](context: TopologyContext, val self: Mergeable[K, V]) extends MergeableProxy[K, V] with MergeableReporter[Mergeable[K, V], K, V] {
private def buildMetric(s: String) = context.registerMetric("store/%s".format(s), new CountMetric, 10)
val mergeMetric = buildMetric("merge")
val multiMergeMetric = buildMetric("multiMerge")
val multiMergeTupleFailedMetric = buildMetric("multiMergeTupleFailed")
val mergeFailedMetric = buildMetric("mergeFailed")
val multiMergeTuplesMetric = buildMetric("multiMergeTuples")


override def traceMerge(kv: (K, V), request: Future[Option[V]]) = {
mergeMetric.incr()
request.onFailure { _ =>
mergeFailedMetric.incr()
}.unit
}

override def traceMultiMerge[K1 <: K](kvs: Map[K1, V], request: Map[K1, Future[Option[V]]]) = {
multiMergeMetric.incr()
multiMergeTuplesMetric.incrBy(request.size)
request.map { case (k, v) =>
val failureWrapV = v.onFailure { _ =>
multiMergeTupleFailedMetric.incr()
}.unit
(k, failureWrapV)
}
}
}



class StoreStatReporter[K, V](context: TopologyContext, val self: Store[K, V]) extends StoreProxy[K, V] with StoreReporter[Store[K, V], K, V] {
private def buildMetric(s: String) = context.registerMetric("store/%s".format(s), new CountMetric, 10)
val putMetric = buildMetric("put")
val multiPutMetric = buildMetric("multiPut")
val multiPutTuplesMetric = buildMetric("multiPutTuples")
val putFailedMetric = buildMetric("putFailed")
val multiPutTupleFailedMetric = buildMetric("multiPutTupleFailed")

val getMetric = buildMetric("get")
val multiGetMetric = buildMetric("multiGet")
val multiGetTuplesMetric = buildMetric("multiGetTuples")
val getFailedMetric = buildMetric("getFailed")
val multiGetTupleFailedMetric = buildMetric("multiGetTupleFailed")

override def traceMultiGet[K1 <: K](ks: Set[K1], request: Map[K1, Future[Option[V]]]) = {
multiGetMetric.incr()
multiGetTuplesMetric.incrBy(request.size)

request.map { case (k, v) =>
val failureWrapV = v.onFailure { _ =>
multiGetTupleFailedMetric.incr()
}.unit
(k, failureWrapV)
}
}

override def traceGet(k: K, request: Future[Option[V]]) = {
getMetric.incr()
request.onFailure { _ =>
getFailedMetric.incr()
}.unit
}

override def tracePut(kv: (K, Option[V]), request: Future[Unit]) = {
putMetric.incr()
request.onFailure { _ =>
putFailedMetric.incr()
}.unit
}

override def traceMultiPut[K1 <: K](kvs: Map[K1, Option[V]], request: Map[K1, Future[Unit]]) = {
multiPutMetric.incr()
multiPutTuplesMetric.incrBy(request.size)

request.map { case (k, v) =>
val failureWrapV = v.onFailure { _ =>
multiPutTupleFailedMetric.incr()
}.unit
(k, failureWrapV)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package com.twitter.summingbird.storm
import Constants._
import backtype.storm.{Config => BacktypeStormConfig, LocalCluster, StormSubmitter}
import backtype.storm.generated.StormTopology
import backtype.storm.task.TopologyContext
import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Tuple

import com.twitter.bijection.{Base64String, Injection}
import com.twitter.algebird.{Monoid, Semigroup}
import com.twitter.chill.IKryoRegistrar
import com.twitter.storehaus.{ReadableStore, WritableStore}
import com.twitter.storehaus.algebra.{MergeableStore, Mergeable}
import com.twitter.storehaus.{ReadableStore, WritableStore, Store}
import com.twitter.storehaus.algebra.{MergeableStore, Mergeable, StoreAlgebra}
import com.twitter.storehaus.algebra.MergeableStore.enrich
import com.twitter.summingbird._
import com.twitter.summingbird.viz.VizGraph
Expand Down Expand Up @@ -59,7 +60,25 @@ sealed trait StormStore[-K, V] {

object MergeableStoreSupplier {
def from[K, V](store: => Mergeable[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreSupplier[K, V] =
MergeableStoreSupplier(() => store, batcher)
MergeableStoreSupplier((TopologyContext) => store, batcher)

def from[K, V](store: TopologyContext => Mergeable[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreSupplier[K, V] =
MergeableStoreSupplier(store, batcher)

def instrumentedStoreFrom[K, V](store: => Store[(K, BatchID), V])(implicit batcher: Batcher, sg: Monoid[V]): MergeableStoreSupplier[K, V] = {
import StoreAlgebra.enrich
val supplier = {(context: TopologyContext) =>
val instrumentedStore = new StoreStatReporter(context, store)
new MergeableStatReporter(context, instrumentedStore.toMergeable)
}
MergeableStoreSupplier(supplier, batcher)
}

def instrumentedMergeableFrom[K, V](store: => MergeableStore[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreSupplier[K, V] = {
val supplier = {(context: TopologyContext) => new MergeableStatReporter(context, store)}
MergeableStoreSupplier(supplier, batcher)
}


def fromOnlineOnly[K, V](store: => MergeableStore[K, V]): MergeableStoreSupplier[K, V] = {
implicit val batcher = Batcher.unit
Expand All @@ -68,7 +87,7 @@ object MergeableStoreSupplier {
}


case class MergeableStoreSupplier[K, V](store: () => Mergeable[(K, BatchID), V], batcher: Batcher) extends StormStore[K, V]
case class MergeableStoreSupplier[K, V] private[summingbird] (store: TopologyContext => Mergeable[(K, BatchID), V], batcher: Batcher) extends StormStore[K, V]

trait StormService[-K, +V] {
def store: StoreFactory[K, V]
Expand Down Expand Up @@ -102,6 +121,13 @@ object Storm {
def store[K, V](store: => Mergeable[(K, BatchID), V])(implicit batcher: Batcher): StormStore[K, V] =
MergeableStoreSupplier.from(store)

def instrumentedStore[K, V](store: => MergeableStore[(K, BatchID), V])(implicit batcher: Batcher, sg: Monoid[V]): StormStore[K, V] =
MergeableStoreSupplier.instrumentedStoreFrom(store)

def store[K, V](store: (TopologyContext) => Mergeable[(K, BatchID), V])(implicit batcher: Batcher): StormStore[K, V] =
MergeableStoreSupplier.from(store)


def service[K, V](serv: => ReadableStore[K, V]): StormService[K, V] = StoreWrapper(() => serv)

def toStormSource[T](spout: Spout[T],
Expand Down Expand Up @@ -216,10 +242,10 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
case MergeableStoreSupplier(contained, _) => contained
}

def wrapMergable(supplier: () => Mergeable[ExecutorKeyType, V]) =
() => {
def wrapMergable(supplier: TopologyContext => Mergeable[ExecutorKeyType, V]) =
(context: TopologyContext) => {
new Mergeable[ExecutorKeyType, ExecutorValueType] {
val innerMergable: Mergeable[ExecutorKeyType, V] = supplier()
val innerMergable: Mergeable[ExecutorKeyType, V] = supplier(context)
implicit val innerSG = innerMergable.semigroup

// Since we don't keep a timestamp in the store
Expand Down

0 comments on commit f64ec6b

Please sign in to comment.