diff --git a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/LookupJoinTest.scala b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/LookupJoinTest.scala deleted file mode 100644 index a1f222282..000000000 --- a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/LookupJoinTest.scala +++ /dev/null @@ -1,198 +0,0 @@ -/* -Copyright 2013 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.summingbird.scalding - -import org.specs2.mutable._ -import java.lang.{ Integer => JInt } -import com.twitter.scalding._ - -import com.twitter.algebird.{ Monoid, Semigroup, Group } - -object LookupJoinedTest { - // Not defined if there is a collision in K and T, so make those unique: - def genList(maxTime: Int, maxKey: Int, sz: Int): List[(Int, Int, Int)] = { - val rng = new java.util.Random - (0 until sz).view.map { _ => - (rng.nextInt(maxTime), rng.nextInt(maxKey), rng.nextInt) - } - .groupBy { case (t, k, v) => (t, k) } - .mapValues(_.headOption.toList) - .values - .flatten - .toList - } -} - -class LookupJoinerJob(args: Args) extends Job(args) { - import TDsl._ - - val in0 = TypedTsv[(Int, Int, Int)]("input0") - val in1 = TypedTsv[(Int, Int, Int)]("input1") - - LookupJoin(TypedPipe.from(in0).map { case (t, k, v) => (t, (k, v)) }, - TypedPipe.from(in1).map { case (t, k, v) => (t, (k, v)) }) - .map { - case (t, (k, (v, opt))) => - (t.toString, k.toString, v.toString, opt.toString) - } - .write(TypedTsv[(String, String, String, String)]("output")) - - LookupJoin.rightSumming(TypedPipe.from(in0).map { case (t, k, v) => (t, (k, v)) }, - TypedPipe.from(in1).map { case (t, k, v) => (t, (k, v)) }) - .map { - case (t, (k, (v, opt))) => - (t.toString, k.toString, v.toString, opt.toString) - } - .write(TypedTsv[(String, String, String, String)]("output2")) -} - -class LookupJoinedTest extends Specification { - - import Dsl._ - import LookupJoinedTest.genList - - def lookupJoin[T: Ordering, K, V, W](in0: Iterable[(T, K, V)], in1: Iterable[(T, K, W)]) = { - val serv = in1.groupBy(_._2) - def lookup(t: T, k: K): Option[W] = { - val ord = Ordering.by { tkw: (T, K, W) => tkw._1 } - serv.get(k).flatMap { in1s => - in1s.filter { case (t1, _, _) => Ordering[T].lt(t1, t) } - .reduceOption(ord.max(_, _)) - .map { _._3 } - } - } - in0.map { case (t, k, v) => (t.toString, k.toString, v.toString, lookup(t, k).toString) } - } - def lookupSumJoin[T: Ordering, K, V, W: Semigroup](in0: Iterable[(T, K, V)], in1: Iterable[(T, K, W)]) = { - implicit val ord: Ordering[(T, K, W)] = Ordering.by { _._1 } - val serv = in1.groupBy(_._2).mapValues { - _.toList - .sorted - .scanLeft(None: Option[(T, K, W)]) { (old, newer) => - old.map { case (_, _, w) => (newer._1, newer._2, Semigroup.plus(w, newer._3)) } - .orElse(Some(newer)) - } - .filter { _.isDefined } - .map { _.get } - }.toMap // Force the map - - def lookup(t: T, k: K): Option[W] = { - val ord = Ordering.by { tkw: (T, K, W) => tkw._1 } - serv.get(k).flatMap { in1s => - in1s.filter { case (t1, _, _) => Ordering[T].lt(t1, t) } - .reduceOption(ord.max(_, _)) - .map { _._3 } - } - } - in0.map { case (t, k, v) => (t.toString, k.toString, v.toString, lookup(t, k).toString) } - } - "A LookupJoinerJob" should { - //Set up the job: - "correctly lookup" in { - val MAX_KEY = 100 - val VAL_COUNT = 10000 - val in0 = genList(Int.MaxValue, MAX_KEY, VAL_COUNT) - val in1 = genList(Int.MaxValue, MAX_KEY, VAL_COUNT) - JobTest(new LookupJoinerJob(_)) - .source(TypedTsv[(Int, Int, Int)]("input0"), in0) - .source(TypedTsv[(Int, Int, Int)]("input1"), in1) - .sink[(String, String, String, String)]( - TypedTsv[(String, String, String, String)]("output")) { outBuf => - outBuf.toSet must be_==(lookupJoin(in0, in1).toSet) - in0.size must be_==(outBuf.size) - } - .sink[(String, String, String, String)]( - TypedTsv[(String, String, String, String)]("output2")) { outBuf => - outBuf.toSet must be_==(lookupSumJoin(in0, in1).toSet) - in0.size must be_==(outBuf.size) - } - .run - //.runHadoop - .finish - } - } -} - -class WindowLookupJoinerJob(args: Args) extends Job(args) { - import TDsl._ - - val in0 = TypedTsv[(Int, Int, Int)]("input0") - val in1 = TypedTsv[(Int, Int, Int)]("input1") - val window = args("window").toInt - - def gate(left: Int, right: Int) = - (left.toLong - right.toLong) < window - - LookupJoin.withWindow(TypedPipe.from(in0).map { case (t, k, v) => (t, (k, v)) }, - TypedPipe.from(in1).map { case (t, k, v) => (t, (k, v)) })(gate _) - .map { - case (t, (k, (v, opt))) => - (t.toString, k.toString, v.toString, opt.toString) - } - .write(TypedTsv[(String, String, String, String)]("output")) -} - -class WindowLookupJoinedTest extends Specification { - - import Dsl._ - import LookupJoinedTest.genList - def windowLookupJoin[K, V, W](in0: Iterable[(Int, K, V)], in1: Iterable[(Int, K, W)], win: Int) = { - val serv = in1.groupBy(_._2) - // super inefficient, but easy to verify: - def lookup(t: Int, k: K): Option[W] = { - val ord = Ordering.by { tkw: (Int, K, W) => tkw._1 } - serv.get(k).flatMap { in1s => - in1s.filter { - case (t1, _, _) => - (t1 < t) && ((t.toLong - t1.toLong) < win) - } - .reduceOption(ord.max(_, _)) - .map { _._3 } - } - } - in0.map { case (t, k, v) => (t.toString, k.toString, v.toString, lookup(t, k).toString) } - } - "A WindowLookupJoinerJob" should { - //Set up the job: - "correctly lookup" in { - val MAX_KEY = 10 - val MAX_TIME = 10000 - val in0 = genList(MAX_TIME, MAX_KEY, 10000) - val in1 = genList(MAX_TIME, MAX_KEY, 10000) - JobTest(new WindowLookupJoinerJob(_)) - .arg("window", "100") - .source(TypedTsv[(Int, Int, Int)]("input0"), in0) - .source(TypedTsv[(Int, Int, Int)]("input1"), in1) - .sink[(String, String, String, String)]( - TypedTsv[(String, String, String, String)]("output")) { outBuf => - val results = outBuf.toList.sorted - val correct = windowLookupJoin(in0, in1, 100).toList.sorted - def some(it: List[(String, String, String, String)]) = - it.filter(_._4.startsWith("Some")) - - def none(it: List[(String, String, String, String)]) = - it.filter(_._4.startsWith("None")) - - some(results) must be_==(some(correct)) - none(results) must be_==(none(correct)) - in0.size must be_==(outBuf.size) - } - .run - //.runHadoop - .finish - } - } -} diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/LookupJoin.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/LookupJoin.scala deleted file mode 100644 index b81872e54..000000000 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/LookupJoin.scala +++ /dev/null @@ -1,186 +0,0 @@ -/* - Copyright 2013 Twitter, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package com.twitter.summingbird.scalding - -import com.twitter.scalding.TypedPipe - -import com.twitter.algebird.Semigroup - -/** - * lookupJoin simulates the behavior of a realtime system attempting - * to leftJoin (K, V) pairs against some other value type (JoinedV) - * by performing realtime lookups on a key-value Store. - * - * An example would join (K, V) pairs of (URL, Username) against a - * service of (URL, ImpressionCount). The result of this join would - * be a pipe of (ShortenedURL, (Username, - * Option[ImpressionCount])). - * - * To simulate this behavior, lookupJoin accepts pipes of key-value - * pairs with an explicit time value T attached. T must have some - * sensical ordering. The semantics are, if one were to hit the - * right pipe's simulated realtime service at any time between - * T(tuple) T(tuple + 1), one would receive Some((K, - * JoinedV)(tuple)). - * - * The entries in the left pipe's tuples have the following - * meaning: - * - * T: The the time at which the (K, W) lookup occurred. - * K: the join key. - * W: the current value for the join key. - * - * The right pipe's entries have the following meaning: - * - * T: The time at which the "service" was fed an update - * K: the join K. - * V: value of the key at time T - * - * Before the time T in the right pipe's very first entry, the - * simulated "service" will return None. After this time T, the - * right side will return None only if the key is absent, - * else, the service will return Some(joinedV). - */ - -// TODO (https://github.com/twitter/scalding/pull/507): Delete when -// this moves into Scalding and our Scalding version bumps. -object LookupJoin extends Serializable { - /** - * This is the "infinite history" join and always joins regardless of how - * much time is between the left and the right - */ - def apply[T: Ordering, K: Ordering, V, JoinedV](left: TypedPipe[(T, (K, V))], - right: TypedPipe[(T, (K, JoinedV))], - reducers: Option[Int] = None): TypedPipe[(T, (K, (V, Option[JoinedV])))] = - withWindow(left, right, reducers)((_, _) => true) - - /** - * In this case, the right pipe is fed through a scanLeft doing a Semigroup.plus - * before joined to the left - */ - def rightSumming[T: Ordering, K: Ordering, V, JoinedV: Semigroup](left: TypedPipe[(T, (K, V))], - right: TypedPipe[(T, (K, JoinedV))], - reducers: Option[Int] = None): TypedPipe[(T, (K, (V, Option[JoinedV])))] = - withWindowRightSumming(left, right, reducers)((_, _) => true) - - /** - * This ensures that gate(Tleft, Tright) == true, else the None is emitted - * as the joined value. - * Useful for bounding the time of the join to a recent window - */ - def withWindow[T: Ordering, K: Ordering, V, JoinedV](left: TypedPipe[(T, (K, V))], - right: TypedPipe[(T, (K, JoinedV))], - reducers: Option[Int] = None)(gate: (T, T) => Boolean): TypedPipe[(T, (K, (V, Option[JoinedV])))] = { - - implicit val keepNew: Semigroup[JoinedV] = Semigroup.from { (older, newer) => newer } - withWindowRightSumming(left, right, reducers)(gate) - } - - /** - * This ensures that gate(Tleft, Tright) == true, else the None is emitted - * as the joined value, and sums are only done as long as they they come - * within the gate interval as well - */ - def withWindowRightSumming[T: Ordering, K: Ordering, V, JoinedV: Semigroup](left: TypedPipe[(T, (K, V))], - right: TypedPipe[(T, (K, JoinedV))], - reducers: Option[Int] = None)(gate: (T, T) => Boolean): TypedPipe[(T, (K, (V, Option[JoinedV])))] = { - /** - * Implicit ordering on an either that doesn't care about the - * actual container values, puts the lookups before the service writes - * Since we assume it takes non-zero time to do a lookup. - */ - implicit def eitherOrd[T, U]: Ordering[Either[T, U]] = - new Ordering[Either[T, U]] { - def compare(l: Either[T, U], r: Either[T, U]) = - (l, r) match { - case (Left(_), Right(_)) => -1 - case (Right(_), Left(_)) => 1 - case (Left(_), Left(_)) => 0 - case (Right(_), Right(_)) => 0 - } - } - - val joined: TypedPipe[(K, (Option[(T, JoinedV)], Option[(T, V, Option[JoinedV])]))] = - left.map { case (t, (k, v)) => (k, (t, Left(v): Either[V, JoinedV])) } - .++(right.map { - case (t, (k, joinedV)) => - (k, (t, Right(joinedV): Either[V, JoinedV])) - }) - .group - .withReducers(reducers.getOrElse(-1)) // -1 means default in scalding - .sortBy(identity) - /** - * Grouping by K leaves values of (T, Either[V, JoinedV]). Sort - * by time and scanLeft. The iterator will now represent pairs of - * T and either new values to join against or updates to the - * simulated "realtime store" described above. - */ - .scanLeft( - /** - * In the simulated realtime store described above, this - * None is the value in the store at the current - * time. Because we sort by time and scan forward, this - * value will be updated with a new value every time a - * Right(delta) shows up in the iterator. - * - * The second entry in the pair will be None when the - * JoinedV is updated and Some(newValue) when a (K, V) - * shows up and a new join occurs. - */ - (Option.empty[(T, JoinedV)], Option.empty[(T, V, Option[JoinedV])]) - ) { - case ((None, result), (time, Left(v))) => { - // The was no value previously - (None, Some((time, v, None))) - } - - case ((prev @ Some((oldt, jv)), result), (time, Left(v))) => { - // Left(v) means that we have a new value from the left - // pipe that we need to join against the current - // "lastJoined" value sitting in scanLeft's state. This - // is equivalent to a lookup on the data in the right - // pipe at time "thisTime". - val filteredJoined = if (gate(time, oldt)) Some(jv) else None - (prev, Some((time, v, filteredJoined))) - } - - case ((None, result), (time, Right(joined))) => { - // There was no value before, so we just update to joined - (Some((time, joined)), None) - } - - case ((Some((oldt, oldJ)), result), (time, Right(joined))) => { - // Right(joinedV) means that we've received a new value - // to use in the simulated realtime service - // described in the comments above - // did it fall out of cache? - val nextJoined = if (gate(time, oldt)) Semigroup.plus(oldJ, joined) else joined - (Some((time, nextJoined)), None) - } - }.toTypedPipe - - // Now, get rid of residual state from the scanLeft above: - joined.flatMap { - case (k, (_, optV)) => - // filter out every event that produced a Right(delta) above, - // leaving only the leftJoin events that occurred above: - optV.map { - case (t, v, optJoined) => (t, (k, (v, optJoined))) - } - } - } -} diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/Service.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/Service.scala index 42dab93ce..da00b85ed 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/Service.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/Service.scala @@ -18,6 +18,7 @@ package com.twitter.summingbird.scalding import com.twitter.algebird.Semigroup import com.twitter.scalding.TypedPipe +import com.twitter.scalding.typed.LookupJoin import com.twitter.summingbird._ import com.twitter.summingbird.option._ import com.twitter.summingbird.scalding.batch.BatchedStore diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/batch/BatchedService.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/batch/BatchedService.scala index 847d53130..5f99e6a58 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/batch/BatchedService.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/batch/BatchedService.scala @@ -19,6 +19,7 @@ package com.twitter.summingbird.scalding.batch import com.twitter.algebird.monad.{ StateWithError, Reader } import com.twitter.algebird.{ Interval, Semigroup } import com.twitter.scalding.{ Mode, TypedPipe } +import com.twitter.scalding.typed.LookupJoin import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp } import com.twitter.summingbird.scalding._ import com.twitter.summingbird.scalding diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/service/BatchedDeltaService.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/service/BatchedDeltaService.scala index 6dcfd14a2..d2eb573f3 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/service/BatchedDeltaService.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/service/BatchedDeltaService.scala @@ -19,6 +19,7 @@ package com.twitter.summingbird.scalding.service import com.twitter.algebird.monad.{ StateWithError, Reader } import com.twitter.algebird.Semigroup import com.twitter.scalding.{ Mode, TypedPipe } +import com.twitter.scalding.typed.LookupJoin import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp } import cascading.flow.FlowDef import com.twitter.summingbird.scalding._ diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/service/BatchedWindowService.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/service/BatchedWindowService.scala index 2162a75a3..ce88543ce 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/service/BatchedWindowService.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/service/BatchedWindowService.scala @@ -19,6 +19,7 @@ package com.twitter.summingbird.scalding.service import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp, Milliseconds } import com.twitter.summingbird.scalding._ import com.twitter.scalding.{ Mode, TypedPipe, AbsoluteDuration } +import com.twitter.scalding.typed.LookupJoin import com.twitter.algebird.monad.Reader import cascading.flow.FlowDef