From 725a236bab2b5d15857e49d0baf0443411ec80ad Mon Sep 17 00:00:00 2001 From: joshualande Date: Sat, 11 Jun 2016 02:10:24 -0700 Subject: [PATCH] Add Aggregator.randomSample aggregator --- .../com/twitter/algebird/Aggregator.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala b/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala index 09977aedc..674136871 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala @@ -13,6 +13,8 @@ import scala.collection.generic.CanBuildFrom object Aggregator extends java.io.Serializable { implicit def applicative[I]: Applicative[({ type L[O] = Aggregator[I, _, O] })#L] = new AggregatorApplicative[I] + private val DefaultSeed = 471312384 + /** * This is a trivial aggregator that always returns a single value */ @@ -211,6 +213,30 @@ object Aggregator extends java.io.Serializable { */ def immutableSortedReverseTake[T: Ordering](count: Int): MonoidAggregator[T, TopK[T], Seq[T]] = new TopKToListAggregator[T](count)(implicitly[Ordering[T]].reverse) + /** + * Randomly selects input items where each item has an independent probability 'prob' of being + * selected. This assumes that all sampled records can fit in memory, so use this only when the + * expected number of sampled values is small. + */ + def randomSample[T](prob: Double, seed: Int = DefaultSeed): MonoidAggregator[T, List[T], List[T]] = { + assert(prob >= 0 && prob <= 1, "randomSample.prob must lie in [0, 1]") + val rng = new java.util.Random(seed) + Preparer[T] + .filter(_ => rng.nextDouble() <= prob) + .monoidAggregate(toList) + } + /** + * Selects exactly 'count' of the input records randomly (or all of the records if there are less + * then 'count' total records). This assumes that all 'count' of the records can fit in memory, + * so use this only for small values of 'count'. + */ + def reservoirSample[T](count: Int, seed: Int = DefaultSeed): MonoidAggregator[T, PriorityQueue[(Double, T)], Seq[T]] = { + val rng = new java.util.Random(seed) + Preparer[T] + .map(rng.nextDouble() -> _) + .monoidAggregate(sortByTake(count)(_._1)) + .andThenPresent(_.map(_._2)) + } /** * Put everything in a List. Note, this could fill the memory if the List is very large. */