Skip to content

Commit

Permalink
Merge pull request twitter#529 from joshualande/joshualande/random_sa…
Browse files Browse the repository at this point in the history
…mple_aggregator

Add an Aggregator.randomSample aggregator
  • Loading branch information
johnynek authored Jun 14, 2016
2 parents 0ba730e + 725a236 commit b8fa8e0
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import scala.collection.generic.CanBuildFrom
object Aggregator extends java.io.Serializable {
implicit def applicative[I]: Applicative[({ type L[O] = Aggregator[I, _, O] })#L] = new AggregatorApplicative[I]

private val DefaultSeed = 471312384

/**
* This is a trivial aggregator that always returns a single value
*/
Expand Down Expand Up @@ -211,6 +213,30 @@ object Aggregator extends java.io.Serializable {
*/
def immutableSortedReverseTake[T: Ordering](count: Int): MonoidAggregator[T, TopK[T], Seq[T]] =
new TopKToListAggregator[T](count)(implicitly[Ordering[T]].reverse)
/**
* Randomly selects input items where each item has an independent probability 'prob' of being
* selected. This assumes that all sampled records can fit in memory, so use this only when the
* expected number of sampled values is small.
*/
def randomSample[T](prob: Double, seed: Int = DefaultSeed): MonoidAggregator[T, List[T], List[T]] = {
assert(prob >= 0 && prob <= 1, "randomSample.prob must lie in [0, 1]")
val rng = new java.util.Random(seed)
Preparer[T]
.filter(_ => rng.nextDouble() <= prob)
.monoidAggregate(toList)
}
/**
* Selects exactly 'count' of the input records randomly (or all of the records if there are less
* then 'count' total records). This assumes that all 'count' of the records can fit in memory,
* so use this only for small values of 'count'.
*/
def reservoirSample[T](count: Int, seed: Int = DefaultSeed): MonoidAggregator[T, PriorityQueue[(Double, T)], Seq[T]] = {
val rng = new java.util.Random(seed)
Preparer[T]
.map(rng.nextDouble() -> _)
.monoidAggregate(sortByTake(count)(_._1))
.andThenPresent(_.map(_._2))
}
/**
* Put everything in a List. Note, this could fill the memory if the List is very large.
*/
Expand Down

0 comments on commit b8fa8e0

Please sign in to comment.