Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Refactor SummerBuilder to make it possible to use Counters on SummerBuilder creation #738

Merged
merged 4 commits into from
Jun 30, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package com.twitter.summingbird.online.option

import com.twitter.util.Duration
import com.twitter.algebird.Semigroup
import com.twitter.algebird.util.summer.AsyncSummer
import com.twitter.algebird.util.summer.{ AsyncSummer, Incrementor }
import com.twitter.summingbird.{ Counter, Name }

case class OnlineSuccessHandler(handlerFn: Unit => Unit)

Expand Down Expand Up @@ -98,8 +99,22 @@ trait SummerBuilder extends Serializable {

/**
* The SummerConstructor option, set this instead of CacheSize, AsyncPoolSize, etc.. to provide how to construct the aggregation for this bolt
* @see [[Summers]] for useful [[SummerConstructor]]s.
*/
case class SummerConstructor(get: SummerBuilder)
case class SummerConstructor(get: SummerConstructorSpec)

trait SummerConstructorSpec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it called SummerConstructorSpec, there's no SummerConstructor in definition.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After offline discussion renamed it to SummerWithCountersBuilder.

def builder(counter: Name => Counter with Incrementor): SummerBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, I think we should just change the definition of SummerBuilder to take the counterBuilder as argument and default to a no-op counterBuilder. SummerBuilder is used very rarely and I think it's ok to break it. There are only a couple places that we'll need to fix at Twitter. Seems better than adding a level of indirection here.
@johnynek wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this change in backward compatible manner - it's still possible to create SummerWithCountersBuilder out of plain SummerBuilder.

}

object SummerConstructor {
def apply(get: SummerBuilder): SummerConstructor =
SummerConstructor(DeprecatedSummerConstructorSpec(get))

private case class DeprecatedSummerConstructorSpec(get: SummerBuilder) extends SummerConstructorSpec {
override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = get
}
}

/**
* How many instances/tasks of this flatmap task should be spawned in the environment
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.twitter.summingbird.online.option

import com.twitter.algebird.Semigroup
import com.twitter.algebird.util.summer._
import com.twitter.summingbird.{ Counter, Name }
import com.twitter.summingbird.online.OnlineDefaultConstants._
import com.twitter.summingbird.option.CacheSize
import com.twitter.util.{ Future, FuturePool }
import java.util.concurrent.{ Executors, TimeUnit }

object Summers {
val MemoryCounterName = Name("memory")
val TimeoutCounterName = Name("timeout")
val SizeCounterName = Name("size")
val TuplesInCounterName = Name("tuplesIn")
val TuplesOutCounterName = Name("tuplesOut")
val InsertCounterName = Name("inserts")
val InsertFailCounterName = Name("insertFail")

val Null = new SummerConstructor(NullConstructor)

def sync(
cacheSize: CacheSize = DEFAULT_FM_CACHE,
flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY,
softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT
): SummerConstructor = new SummerConstructor(SyncConstructor(cacheSize, flushFrequency, softMemoryFlushPercent))

def async(
cacheSize: CacheSize = DEFAULT_FM_CACHE,
flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY,
softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT,
asyncPoolSize: AsyncPoolSize = DEFAULT_ASYNC_POOL_SIZE,
compactValues: CompactValues = CompactValues.default,
valueCombinerCacheSize: ValueCombinerCacheSize = DEFAULT_VALUE_COMBINER_CACHE_SIZE
): SummerConstructor = new SummerConstructor(AsyncConstructor(
cacheSize, flushFrequency, softMemoryFlushPercent, asyncPoolSize, compactValues, valueCombinerCacheSize
))

private case object NullConstructor extends SummerConstructorSpec {
override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = {
val tuplesIn = counter(TuplesInCounterName)
val tuplesOut = counter(TuplesOutCounterName)
new SummerBuilder {
override def getSummer[K, V: Semigroup]: AsyncSummer[(K, V), Map[K, V]] =
new com.twitter.algebird.util.summer.NullSummer[K, V](tuplesIn, tuplesOut)
}
}
}

private case class SyncConstructor(
cacheSize: CacheSize,
flushFrequency: FlushFrequency,
softMemoryFlushPercent: SoftMemoryFlushPercent
) extends SummerConstructorSpec {
override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = {
val memoryCounter = counter(MemoryCounterName)
val timeoutCounter = counter(TimeoutCounterName)
val sizeCounter = counter(SizeCounterName)
val tupleInCounter = counter(TuplesInCounterName)
val tupleOutCounter = counter(TuplesOutCounterName)
val insertCounter = counter(InsertCounterName)

new SummerBuilder {
def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = {
new SyncSummingQueue[K, V](
BufferSize(cacheSize.lowerBound),
com.twitter.algebird.util.summer.FlushFrequency(flushFrequency.get),
MemoryFlushPercent(softMemoryFlushPercent.get),
memoryCounter,
timeoutCounter,
sizeCounter,
insertCounter,
tupleInCounter,
tupleOutCounter)
}
}
}
}

private case class AsyncConstructor(
cacheSize: CacheSize,
flushFrequency: FlushFrequency,
softMemoryFlushPercent: SoftMemoryFlushPercent,
asyncPoolSize: AsyncPoolSize,
compactValues: CompactValues,
valueCombinerCacheSize: ValueCombinerCacheSize
) extends SummerConstructorSpec {
override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = {
val memoryCounter = counter(MemoryCounterName)
val timeoutCounter = counter(TimeoutCounterName)
val sizeCounter = counter(SizeCounterName)
val tupleInCounter = counter(TuplesInCounterName)
val tupleOutCounter = counter(TuplesOutCounterName)
val insertCounter = counter(InsertCounterName)
val insertFailCounter = counter(InsertFailCounterName)

new SummerBuilder {
def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = {
val executor = Executors.newFixedThreadPool(asyncPoolSize.get)
val futurePool = FuturePool(executor)
val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound),
com.twitter.algebird.util.summer.FlushFrequency(flushFrequency.get),
MemoryFlushPercent(softMemoryFlushPercent.get),
memoryCounter,
timeoutCounter,
insertCounter,
insertFailCounter,
sizeCounter,
tupleInCounter,
tupleOutCounter,
futurePool,
Compact(compactValues.toBoolean),
CompactionSize(valueCombinerCacheSize.get))
summer.withCleanup(() => {
Future {
executor.shutdown
executor.awaitTermination(10, TimeUnit.SECONDS)
}
})
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,120 +16,58 @@

package com.twitter.summingbird.storm

import com.twitter.algebird.Semigroup
import com.twitter.algebird.util.summer._
import com.twitter.summingbird.{ Counter, Group, Name }
import com.twitter.summingbird.online.option.{ CompactValues, SummerBuilder, SummerConstructor }
import com.twitter.summingbird.option.JobId
import com.twitter.summingbird.online.OnlineDefaultConstants._
import com.twitter.summingbird.{ Counter, Group }
import com.twitter.summingbird.online.option.{ CompactValues, SummerBuilder, SummerConstructor, Summers }
import com.twitter.summingbird.storm.planner.StormNode
import com.twitter.util.{ Future, FuturePool }
import java.util.concurrent.{ Executors, TimeUnit }
import org.slf4j.LoggerFactory
import Constants._
import scala.reflect.ClassTag

/*
* The BuildSummer class is responsible for decoding from the options what SummerBuilder to use when setting up bolts.
* It has two primary modes, reading a SummerConstructor setting directly and using its contents, or via the legacy route.
* Reading all the options internally.
*/
private[storm] object BuildSummer {
@transient private val logger = LoggerFactory.getLogger(BuildSummer.getClass)

def apply(builder: StormTopologyBuilder, node: StormNode) = {
val opSummerConstructor = builder.get[SummerConstructor](node).map(_._2)
logger.debug(s"Node (${builder.getNodeName(node)}): Queried for SummerConstructor, got $opSummerConstructor")

opSummerConstructor match {
case Some(cons) =>
logger.debug(s"Node (${builder.getNodeName(node)}): Using user supplied SummerConstructor: $cons")
cons.get
case None => legacyBuilder(builder, node)
@transient private[storm] val logger = LoggerFactory.getLogger(BuildSummer.getClass)

def apply(builder: StormTopologyBuilder, node: StormNode): SummerBuilder = {
val summerConstructorSpec = builder.get[SummerConstructor](node)
.map { case (_, constructor) => constructor }.getOrElse {
logger.info(s"[${builder.getNodeName(node)}] use legacy way of getting summer constructor")
legacySummerConstructor(builder, node)
}.get

logger.info(s"[${builder.getNodeName(node)}] summer constructor spec: $summerConstructorSpec")
summerConstructorSpec.builder { counterName =>
require(builder.jobId.get != null, "Unable to register metrics with no job id present in the config updater")
new Counter(Group("summingbird." + builder.getNodeName(node)), counterName)(builder.jobId) with Incrementor
}
}

private[this] final def legacyBuilder(builder: StormTopologyBuilder, node: StormNode) = {
val nodeName = builder.getNodeName(node)
val cacheSize = builder.getOrElse(node, DEFAULT_FM_CACHE)
val jobId = builder.jobId
require(jobId.get != null, "Unable to register metrics with no job id present in the config updater")
logger.info(s"[$nodeName] cacheSize lowerbound: ${cacheSize.lowerBound}")
private def legacySummerConstructor(builder: StormTopologyBuilder, node: StormNode): SummerConstructor = {
def option[T <: AnyRef: ClassTag](default: T): T = builder.getOrElse[T](node, default)

val memoryCounter = counter(jobId, Group(nodeName), Name("memory"))
val timeoutCounter = counter(jobId, Group(nodeName), Name("timeout"))
val sizeCounter = counter(jobId, Group(nodeName), Name("size"))
val tupleInCounter = counter(jobId, Group(nodeName), Name("tuplesIn"))
val tupleOutCounter = counter(jobId, Group(nodeName), Name("tuplesOut"))
val insertCounter = counter(jobId, Group(nodeName), Name("inserts"))
val insertFailCounter = counter(jobId, Group(nodeName), Name("insertFail"))
val cacheSize = option(DEFAULT_FM_CACHE)

if (cacheSize.lowerBound == 0) {
new SummerBuilder {
def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = {
new com.twitter.algebird.util.summer.NullSummer[K, V](tupleInCounter, tupleOutCounter)
}
}
Summers.Null
} else {
val softMemoryFlush = builder.getOrElse(node, DEFAULT_SOFT_MEMORY_FLUSH_PERCENT)
logger.info(s"[$nodeName] softMemoryFlush : ${softMemoryFlush.get}")

val flushFrequency = builder.getOrElse(node, DEFAULT_FLUSH_FREQUENCY)
logger.info(s"[$nodeName] maxWaiting: ${flushFrequency.get}")

val useAsyncCache = builder.getOrElse(node, DEFAULT_USE_ASYNC_CACHE)
logger.info(s"[$nodeName] useAsyncCache : ${useAsyncCache.get}")
val softMemoryFlush = option(DEFAULT_SOFT_MEMORY_FLUSH_PERCENT)
val flushFrequency = option(DEFAULT_FLUSH_FREQUENCY)
val useAsyncCache = option(DEFAULT_USE_ASYNC_CACHE)

if (!useAsyncCache.get) {
new SummerBuilder {
def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = {
new SyncSummingQueue[K, V](
BufferSize(cacheSize.lowerBound),
FlushFrequency(flushFrequency.get),
MemoryFlushPercent(softMemoryFlush.get),
memoryCounter,
timeoutCounter,
sizeCounter,
insertCounter,
tupleInCounter,
tupleOutCounter)
}
}
Summers.sync(cacheSize, flushFrequency, softMemoryFlush)
} else {
val asyncPoolSize = builder.getOrElse(node, DEFAULT_ASYNC_POOL_SIZE)
logger.info(s"[$nodeName] asyncPoolSize : ${asyncPoolSize.get}")

val valueCombinerCrushSize = builder.getOrElse(node, DEFAULT_VALUE_COMBINER_CACHE_SIZE)
logger.info(s"[$nodeName] valueCombinerCrushSize : ${valueCombinerCrushSize.get}")

val doCompact = builder.getOrElse(node, CompactValues.default)

new SummerBuilder {
def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = {
val executor = Executors.newFixedThreadPool(asyncPoolSize.get)
val futurePool = FuturePool(executor)
val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound),
FlushFrequency(flushFrequency.get),
MemoryFlushPercent(softMemoryFlush.get),
memoryCounter,
timeoutCounter,
insertCounter,
insertFailCounter,
sizeCounter,
tupleInCounter,
tupleOutCounter,
futurePool,
Compact(doCompact.toBoolean),
CompactionSize(valueCombinerCrushSize.get))
summer.withCleanup(() => {
Future {
executor.shutdown
executor.awaitTermination(10, TimeUnit.SECONDS)
}
})
}
}
val asyncPoolSize = option(DEFAULT_ASYNC_POOL_SIZE)
val valueCombinerCrushSize = option(DEFAULT_VALUE_COMBINER_CACHE_SIZE)
val doCompact = option(CompactValues.default)
Summers.async(
cacheSize, flushFrequency, softMemoryFlush, asyncPoolSize, doCompact, valueCombinerCrushSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too long, move each param to separate line.

)
}
}
}

def counter(jobID: JobId, nodeName: Group, counterName: Name) = new Counter(Group("summingbird." + nodeName.getString), counterName)(jobID) with Incrementor
}