diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala index f27f7c3a0..a490407b8 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala @@ -2,7 +2,8 @@ package com.twitter.summingbird.online.option import com.twitter.util.Duration import com.twitter.algebird.Semigroup -import com.twitter.algebird.util.summer.AsyncSummer +import com.twitter.algebird.util.summer.{ AsyncSummer, Incrementor } +import com.twitter.summingbird.{ Counter, Name } case class OnlineSuccessHandler(handlerFn: Unit => Unit) @@ -98,8 +99,26 @@ trait SummerBuilder extends Serializable { /** * The SummerConstructor option, set this instead of CacheSize, AsyncPoolSize, etc.. to provide how to construct the aggregation for this bolt + * @see [[Summers]] for useful [[SummerWithCountersBuilder]]s. */ -case class SummerConstructor(get: SummerBuilder) +case class SummerConstructor(get: SummerWithCountersBuilder) + +/** + * Returned [[SummerBuilder]] should be [[Serializable]], while [[SummerWithCountersBuilder]] + * should be used only on submitter node. + */ +trait SummerWithCountersBuilder { + def create(counter: Name => Incrementor): SummerBuilder +} + +object SummerConstructor { + def apply(get: SummerBuilder): SummerConstructor = + SummerConstructor(DeprecatedSummerConstructorSpec(get)) + + private case class DeprecatedSummerConstructorSpec(get: SummerBuilder) extends SummerWithCountersBuilder { + override def create(counter: (Name) => Incrementor): SummerBuilder = get + } +} /** * How many instances/tasks of this flatmap task should be spawned in the environment diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala new file mode 100644 index 000000000..54f2453b8 --- /dev/null +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala @@ -0,0 +1,105 @@ +package com.twitter.summingbird.online.option + +import com.twitter.algebird.Semigroup +import com.twitter.algebird.util.summer._ +import com.twitter.summingbird.Name +import com.twitter.summingbird.online.OnlineDefaultConstants._ +import com.twitter.summingbird.option.CacheSize +import com.twitter.util.{ Future, FuturePool } +import java.util.concurrent.{ Executors, TimeUnit } + +object Summers { + val MemoryCounterName = Name("memory") + val TimeoutCounterName = Name("timeout") + val SizeCounterName = Name("size") + val TuplesInCounterName = Name("tuplesIn") + val TuplesOutCounterName = Name("tuplesOut") + val InsertCounterName = Name("inserts") + val InsertFailCounterName = Name("insertFail") + + case object Null extends SummerWithCountersBuilder { + override def create(counter: (Name) => Incrementor): SummerBuilder = { + val tuplesIn = counter(TuplesInCounterName) + val tuplesOut = counter(TuplesOutCounterName) + new SummerBuilder { + override def getSummer[K, V: Semigroup]: AsyncSummer[(K, V), Map[K, V]] = + new com.twitter.algebird.util.summer.NullSummer[K, V](tuplesIn, tuplesOut) + } + } + } + + case class Sync( + cacheSize: CacheSize = DEFAULT_FM_CACHE, + flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, + softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT + ) extends SummerWithCountersBuilder { + override def create(counter: (Name) => Incrementor): SummerBuilder = { + val memoryCounter = counter(MemoryCounterName) + val timeoutCounter = counter(TimeoutCounterName) + val sizeCounter = counter(SizeCounterName) + val tupleInCounter = counter(TuplesInCounterName) + val tupleOutCounter = counter(TuplesOutCounterName) + val insertCounter = counter(InsertCounterName) + + new SummerBuilder { + def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { + new SyncSummingQueue[K, V]( + BufferSize(cacheSize.lowerBound), + com.twitter.algebird.util.summer.FlushFrequency(flushFrequency.get), + MemoryFlushPercent(softMemoryFlushPercent.get), + memoryCounter, + timeoutCounter, + sizeCounter, + insertCounter, + tupleInCounter, + tupleOutCounter) + } + } + } + } + + case class Async( + cacheSize: CacheSize = DEFAULT_FM_CACHE, + flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, + softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT, + asyncPoolSize: AsyncPoolSize = DEFAULT_ASYNC_POOL_SIZE, + compactValues: CompactValues = CompactValues.default, + valueCombinerCacheSize: ValueCombinerCacheSize = DEFAULT_VALUE_COMBINER_CACHE_SIZE + ) extends SummerWithCountersBuilder { + override def create(counter: (Name) => Incrementor): SummerBuilder = { + val memoryCounter = counter(MemoryCounterName) + val timeoutCounter = counter(TimeoutCounterName) + val sizeCounter = counter(SizeCounterName) + val tupleInCounter = counter(TuplesInCounterName) + val tupleOutCounter = counter(TuplesOutCounterName) + val insertCounter = counter(InsertCounterName) + val insertFailCounter = counter(InsertFailCounterName) + + new SummerBuilder { + def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { + val executor = Executors.newFixedThreadPool(asyncPoolSize.get) + val futurePool = FuturePool(executor) + val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound), + com.twitter.algebird.util.summer.FlushFrequency(flushFrequency.get), + MemoryFlushPercent(softMemoryFlushPercent.get), + memoryCounter, + timeoutCounter, + insertCounter, + insertFailCounter, + sizeCounter, + tupleInCounter, + tupleOutCounter, + futurePool, + Compact(compactValues.toBoolean), + CompactionSize(valueCombinerCacheSize.get)) + summer.withCleanup(() => { + Future { + executor.shutdown + executor.awaitTermination(10, TimeUnit.SECONDS) + } + }) + } + } + } + } +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index 37ef02bb4..f9def39c4 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -16,16 +16,13 @@ package com.twitter.summingbird.storm -import com.twitter.algebird.Semigroup -import com.twitter.algebird.util.summer._ -import com.twitter.summingbird.{ Counter, Group, Name } -import com.twitter.summingbird.online.option.{ CompactValues, SummerBuilder, SummerConstructor } -import com.twitter.summingbird.option.JobId +import com.twitter.algebird.util.summer.Incrementor +import com.twitter.summingbird.online.OnlineDefaultConstants._ +import com.twitter.summingbird.{ Counter, Group } +import com.twitter.summingbird.online.option._ import com.twitter.summingbird.storm.planner.StormNode -import com.twitter.util.{ Future, FuturePool } -import java.util.concurrent.{ Executors, TimeUnit } import org.slf4j.LoggerFactory -import Constants._ +import scala.reflect.ClassTag /* * The BuildSummer class is responsible for decoding from the options what SummerBuilder to use when setting up bolts. @@ -33,103 +30,50 @@ import Constants._ * Reading all the options internally. */ private[storm] object BuildSummer { - @transient private val logger = LoggerFactory.getLogger(BuildSummer.getClass) - - def apply(builder: StormTopologyBuilder, node: StormNode) = { - val opSummerConstructor = builder.get[SummerConstructor](node).map(_._2) - logger.debug(s"Node (${builder.getNodeName(node)}): Queried for SummerConstructor, got $opSummerConstructor") + @transient private[storm] val logger = LoggerFactory.getLogger(BuildSummer.getClass) + + def apply(builder: StormTopologyBuilder, node: StormNode): SummerBuilder = { + val summerBuilder = builder.get[SummerConstructor](node) + .map { case (_, constructor) => constructor.get } + .getOrElse { + logger.info(s"[${builder.getNodeName(node)}] use legacy way of getting summer builder") + legacySummerBuilder(builder, node) + } - opSummerConstructor match { - case Some(cons) => - logger.debug(s"Node (${builder.getNodeName(node)}): Using user supplied SummerConstructor: $cons") - cons.get - case None => legacyBuilder(builder, node) + logger.info(s"[${builder.getNodeName(node)}] summer builder: $summerBuilder") + summerBuilder.create { counterName => + require(builder.jobId.get != null, "Unable to register metrics with no job id present in the config updater") + new Counter(Group("summingbird." + builder.getNodeName(node)), counterName)(builder.jobId) with Incrementor } } - private[this] final def legacyBuilder(builder: StormTopologyBuilder, node: StormNode) = { - val nodeName = builder.getNodeName(node) - val cacheSize = builder.getOrElse(node, DEFAULT_FM_CACHE) - val jobId = builder.jobId - require(jobId.get != null, "Unable to register metrics with no job id present in the config updater") - logger.info(s"[$nodeName] cacheSize lowerbound: ${cacheSize.lowerBound}") + private def legacySummerBuilder(builder: StormTopologyBuilder, node: StormNode): SummerWithCountersBuilder = { + def option[T <: AnyRef: ClassTag](default: T): T = builder.getOrElse[T](node, default) - val memoryCounter = counter(jobId, Group(nodeName), Name("memory")) - val timeoutCounter = counter(jobId, Group(nodeName), Name("timeout")) - val sizeCounter = counter(jobId, Group(nodeName), Name("size")) - val tupleInCounter = counter(jobId, Group(nodeName), Name("tuplesIn")) - val tupleOutCounter = counter(jobId, Group(nodeName), Name("tuplesOut")) - val insertCounter = counter(jobId, Group(nodeName), Name("inserts")) - val insertFailCounter = counter(jobId, Group(nodeName), Name("insertFail")) + val cacheSize = option(DEFAULT_FM_CACHE) if (cacheSize.lowerBound == 0) { - new SummerBuilder { - def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - new com.twitter.algebird.util.summer.NullSummer[K, V](tupleInCounter, tupleOutCounter) - } - } + Summers.Null } else { - val softMemoryFlush = builder.getOrElse(node, DEFAULT_SOFT_MEMORY_FLUSH_PERCENT) - logger.info(s"[$nodeName] softMemoryFlush : ${softMemoryFlush.get}") - - val flushFrequency = builder.getOrElse(node, DEFAULT_FLUSH_FREQUENCY) - logger.info(s"[$nodeName] maxWaiting: ${flushFrequency.get}") - - val useAsyncCache = builder.getOrElse(node, DEFAULT_USE_ASYNC_CACHE) - logger.info(s"[$nodeName] useAsyncCache : ${useAsyncCache.get}") + val softMemoryFlush = option(DEFAULT_SOFT_MEMORY_FLUSH_PERCENT) + val flushFrequency = option(DEFAULT_FLUSH_FREQUENCY) + val useAsyncCache = option(DEFAULT_USE_ASYNC_CACHE) if (!useAsyncCache.get) { - new SummerBuilder { - def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - new SyncSummingQueue[K, V]( - BufferSize(cacheSize.lowerBound), - FlushFrequency(flushFrequency.get), - MemoryFlushPercent(softMemoryFlush.get), - memoryCounter, - timeoutCounter, - sizeCounter, - insertCounter, - tupleInCounter, - tupleOutCounter) - } - } + Summers.Sync(cacheSize, flushFrequency, softMemoryFlush) } else { - val asyncPoolSize = builder.getOrElse(node, DEFAULT_ASYNC_POOL_SIZE) - logger.info(s"[$nodeName] asyncPoolSize : ${asyncPoolSize.get}") - - val valueCombinerCrushSize = builder.getOrElse(node, DEFAULT_VALUE_COMBINER_CACHE_SIZE) - logger.info(s"[$nodeName] valueCombinerCrushSize : ${valueCombinerCrushSize.get}") - - val doCompact = builder.getOrElse(node, CompactValues.default) - - new SummerBuilder { - def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - val executor = Executors.newFixedThreadPool(asyncPoolSize.get) - val futurePool = FuturePool(executor) - val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound), - FlushFrequency(flushFrequency.get), - MemoryFlushPercent(softMemoryFlush.get), - memoryCounter, - timeoutCounter, - insertCounter, - insertFailCounter, - sizeCounter, - tupleInCounter, - tupleOutCounter, - futurePool, - Compact(doCompact.toBoolean), - CompactionSize(valueCombinerCrushSize.get)) - summer.withCleanup(() => { - Future { - executor.shutdown - executor.awaitTermination(10, TimeUnit.SECONDS) - } - }) - } - } + val asyncPoolSize = option(DEFAULT_ASYNC_POOL_SIZE) + val valueCombinerCrushSize = option(DEFAULT_VALUE_COMBINER_CACHE_SIZE) + val doCompact = option(CompactValues.default) + Summers.Async( + cacheSize, + flushFrequency, + softMemoryFlush, + asyncPoolSize, + doCompact, + valueCombinerCrushSize + ) } } } - - def counter(jobID: JobId, nodeName: Group, counterName: Name) = new Counter(Group("summingbird." + nodeName.getString), counterName)(jobID) with Incrementor }