Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Add aggregate to summingbird #562

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add tests for aggregate
  • Loading branch information
oscar-stripe committed Jul 21, 2016
commit a6a67d1fb2ef1293f821669e5b66bb19c710e64c
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ limitations under the License.

package com.twitter.summingbird.memory

import com.twitter.algebird.{ MapAlgebra, Monoid, Semigroup }
import com.twitter.algebird.{ Aggregator, MapAlgebra, Monoid, Semigroup }
import com.twitter.summingbird._
import com.twitter.summingbird.option.JobId
import org.scalacheck.{ Arbitrary, _ }
@@ -214,6 +214,32 @@ class MemoryLaws extends WordSpec {
assert(store1.toMap == ((0 to 100).groupBy(_ % 3).mapValues(_.sum)))
assert(store2.toMap == ((0 to 100).groupBy(_ % 3).mapValues(_.sum)))
}
"aggregate should work" in {
val source = Memory.toSource((0 to 100).reverse)
val store = MutableMap.empty[Int, Int]
val buf = MutableMap.empty[Int, List[(Option[Int], Int)]]
val prod = source.map { t => (t % 2, t) }
.aggregate(store, Aggregator.max[Int].andThenPresent(_ * 2).composePrepare(_ / 2))
.write { kv =>
val (k, vs) = kv
buf(k) = vs :: buf.getOrElse(k, Nil)
}
val mem = new Memory
mem.run(mem.plan(prod))

assert(store.keySet == Set(0, 1))
assert(store(0) == (0 to 100).filter(_ % 2 == 0).map(_ / 2).max)
assert(store(1) == (0 to 100).filter(_ % 2 == 1).map(_ / 2).max)
assert(buf.keySet == Set(0, 1))
assert(buf(0).map(_._2) ==
(0 to 100).reverse.filter(_ % 2 == 0).map { t => (t / 2) * 2 }.toList)
assert(buf(0).map(_._1) ==
(None :: ((0 to 100).reverse.filter(_ % 2 == 0).map { t => Some((t / 2)*2) }.toList)))
assert(buf(1).map(_._2) ==
(0 to 100).reverse.filter(_ % 2 == 1).map { t => (t / 2) * 2 }.toList)
assert(buf(1).map(_._1) ==
(None :: ((0 to 100).reverse.filter(_ % 2 == 1).map { t => Some((t / 2)*2) }.toList)))
}

"self also shouldn't duplicate work" in {
val platform = new Memory
Original file line number Diff line number Diff line change
@@ -258,8 +258,7 @@ sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] {
* and is not meaningful in the general case.
*/
def aggregate[V1, V2](store: P#Store[K, V1], agg: Aggregator[V, V1, V2]): KeyedProducer[P, K, (Option[V2], V2)] = {
// When the next version of algebird is added, use agg.semigroup
val sg = Semigroup.from[V1](agg.reduce)
val sg = agg.semigroup
mapValues(agg.prepare)
.sumByKey(store)(sg)
.mapValues {