Skip to content

Commit

Permalink
Merge pull request twitter#501 from erikerlandson/feature/append_monoid
Browse files Browse the repository at this point in the history
Implement an appendMonoid Aggregator factory which yields aggregators…
  • Loading branch information
avibryant committed Dec 4, 2015
2 parents 3a4c927 + f0dc4f2 commit 938798e
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 4 deletions.
85 changes: 85 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ object Aggregator extends java.io.Serializable {
def fromMonoid[F, T](implicit mon: Monoid[T], prep: F => T): MonoidAggregator[F, T, T] =
prepareMonoid(prep)(mon)

def prepareSemigroup[F, T](prep: F => T)(implicit sg: Semigroup[T]): Aggregator[F, T, T] = new Aggregator[F, T, T] {
def prepare(input: F) = prep(input)
def semigroup = sg
def present(reduction: T) = reduction
}
def prepareMonoid[F, T](prep: F => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] = new MonoidAggregator[F, T, T] {
def prepare(input: F) = prep(input)
def monoid = m
Expand All @@ -47,6 +52,86 @@ object Aggregator extends java.io.Serializable {
def present(reduction: T) = reduction
}

/**
* Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation.
* Equivalent to {{{ appendSemigroup(prep, appnd, identity[T]_)(sg) }}}
*/
def appendSemigroup[F, T](prep: F => T, appnd: (T, F) => T)(implicit sg: Semigroup[T]): Aggregator[F, T, T] =
appendSemigroup(prep, appnd, identity[T]_)(sg)

/**
* Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation
* @tparam F Data input type
* @tparam T Aggregating [[Semigroup]] type
* @tparam P Presentation (output) type
* @param prep The preparation function. Expected to construct an instance of type T from a single data element.
* @param appnd Function that appends the [[Semigroup]]. Defines the [[append]] method for this aggregator.
* Analogous to the 'seqop' function in Scala's sequence 'aggregate' method
* @param pres The presentation function
* @param sg The [[Semigroup]] type class
* @note The functions 'appnd' and 'prep' are expected to obey the law: {{{ appnd(t, f) == sg.plus(t, prep(f)) }}}
*/
def appendSemigroup[F, T, P](prep: F => T, appnd: (T, F) => T, pres: T => P)(implicit sg: Semigroup[T]): Aggregator[F, T, P] =
new Aggregator[F, T, P] {
def semigroup: Semigroup[T] = sg
def prepare(input: F): T = prep(input)
def present(reduction: T): P = pres(reduction)

override def apply(inputs: TraversableOnce[F]): P = applyOption(inputs).get

override def applyOption(inputs: TraversableOnce[F]): Option[P] = agg(inputs).map(pres)

override def append(l: T, r: F): T = appnd(l, r)

override def appendAll(old: T, items: TraversableOnce[F]): T =
if (items.isEmpty) old else reduce(old, agg(items).get)

private def agg(inputs: TraversableOnce[F]): Option[T] =
if (inputs.isEmpty) None else {
val itr = inputs.toIterator
val t = prepare(itr.next)
Some(itr.foldLeft(t)(appnd))
}
}

/**
* Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation.
* Equivalent to {{{ appendMonoid(appnd, identity[T]_)(m) }}}
*/
def appendMonoid[F, T](appnd: (T, F) => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] =
appendMonoid(appnd, identity[T]_)(m)

/**
* Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation
* @tparam F Data input type
* @tparam T Aggregating [[Monoid]] type
* @tparam P Presentation (output) type
* @param appnd Function that appends the [[Monoid]]. Defines the [[append]] method for this aggregator.
* Analogous to the 'seqop' function in Scala's sequence 'aggregate' method
* @param pres The presentation function
* @param m The [[Monoid]] type class
* @note The function 'appnd' is expected to obey the law: {{{ appnd(t, f) == m.plus(t, appnd(m.zero, f)) }}}
*/
def appendMonoid[F, T, P](appnd: (T, F) => T, pres: T => P)(implicit m: Monoid[T]): MonoidAggregator[F, T, P] =
new MonoidAggregator[F, T, P] {
def monoid: Monoid[T] = m
def prepare(input: F): T = appnd(m.zero, input)
def present(reduction: T): P = pres(reduction)

override def apply(inputs: TraversableOnce[F]): P = present(agg(inputs))

override def applyOption(inputs: TraversableOnce[F]): Option[P] =
if (inputs.isEmpty) None else Some(apply(inputs))

override def append(l: T, r: F): T = appnd(l, r)

override def appendAll(old: T, items: TraversableOnce[F]): T = reduce(old, agg(items))

override def appendAll(items: TraversableOnce[F]): T = agg(items)

private def agg(inputs: TraversableOnce[F]): T = inputs.aggregate(m.zero)(appnd, m.plus)
}

/**
* How many items satisfy a predicate
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal {
* requires a commutative Semigroup. To generalize to non-commutative, we need a sorted partition for
* T.
*/
def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] =
(new AlgebirdRDD(rdd.map(agg.prepare)))
.sumOption(agg.semigroup, implicitly)
.map(agg.present)
def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = {
val pr = rdd.mapPartitions({ data =>
if (data.isEmpty) Iterator.empty else {
val b = agg.prepare(data.next)
Iterator(agg.appendAll(b, data))
}
}, preservesPartitioning = true)
pr.coalesce(1, shuffle = true)
.mapPartitions(pr => Iterator(agg.semigroup.sumOption(pr)))
.collect.head.map(agg.present)
}

/**
* This will throw if you use a non-MonoidAggregator with an empty RDD
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.twitter.algebird

import org.scalatest._

class AppendAggregatorTest extends WordSpec with Matchers {
val data = Vector.fill(100) { scala.util.Random.nextInt(100) }
val mpty = Vector.empty[Int]

// test the methods that appendSemigroup method defines or overrides
def testMethodsSemigroup[E, M, P](
agg1: Aggregator[E, M, P],
agg2: Aggregator[E, M, P],
data: Seq[E],
empty: Seq[E]) {

val n = data.length
val (half1, half2) = data.splitAt(n / 2)
val lhs = agg1.appendAll(agg1.prepare(half1.head), half1.tail)

data.foreach { e =>
agg1.prepare(e) should be(agg2.prepare(e))
}

agg1.present(lhs) should be(agg2.present(lhs))

agg1(data) should be (agg2(data))

agg1.applyOption(data) should be(agg2.applyOption(data))
agg1.applyOption(empty) should be(agg2.applyOption(empty))

half2.foreach { e =>
agg1.append(lhs, e) should be(agg2.append(lhs, e))
}

agg1.appendAll(lhs, half2) should be(agg2.appendAll(lhs, half2))
}

// test the methods that appendMonoid method defines or overrides
def testMethodsMonoid[E, M, P](
agg1: MonoidAggregator[E, M, P],
agg2: MonoidAggregator[E, M, P],
data: Seq[E],
empty: Seq[E]) {

testMethodsSemigroup(agg1, agg2, data, empty)

agg1(empty) should be (agg2(empty))
agg1.appendAll(data) should be(agg2.appendAll(data))
}

"appendMonoid" should {
"be equivalent to integer monoid aggregator" in {
val agg1 = Aggregator.fromMonoid[Int]
val agg2 = Aggregator.appendMonoid((m: Int, e: Int) => m + e)
testMethodsMonoid(agg1, agg2, data, mpty)
}

"be equivalent to set monoid aggregator" in {
object setMonoid extends Monoid[Set[Int]] {
val zero = Set.empty[Int]
def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2
}

val agg1 = Aggregator.prepareMonoid((e: Int) => Set(e))(setMonoid)
val agg2 = Aggregator.appendMonoid((m: Set[Int], e: Int) => m + e)(setMonoid)

testMethodsMonoid(agg1, agg2, data, mpty)
}
}

"appendSemigroup" should {
"be equivalent to integer semigroup aggregator" in {
val agg1 = Aggregator.fromSemigroup[Int]
val agg2 = Aggregator.appendSemigroup(identity[Int]_, (m: Int, e: Int) => m + e)
testMethodsSemigroup(agg1, agg2, data, mpty)
}

"be equivalent to set semigroup aggregator" in {
object setSemigroup extends Semigroup[Set[Int]] {
def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2
}

val agg1 = Aggregator.prepareSemigroup((e: Int) => Set(e))(setSemigroup)
val agg2 = Aggregator.appendSemigroup((e: Int) => Set(e), (m: Set[Int], e: Int) => m + e)(setSemigroup)

testMethodsSemigroup(agg1, agg2, data, mpty)
}
}
}

0 comments on commit 938798e

Please sign in to comment.