diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala index f0ee79e8db..588599cacb 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala @@ -18,36 +18,163 @@ package com.twitter.scalding.typed import com.twitter.scalding._ object Joiner extends java.io.Serializable { - def toCogroupJoiner2[K, V, U, R](hashJoiner: (K, V, Iterable[U]) => Iterator[R]): (K, Iterator[V], Iterable[U]) => Iterator[R] = { - (k: K, itv: Iterator[V], itu: Iterable[U]) => - itv.flatMap { hashJoiner(k, _, itu) } + + type JoinFn[K, V, U, R] = (K, Iterator[V], Iterable[U]) => Iterator[R] + type HashJoinFn[K, V, U, R] = (K, V, Iterable[U]) => Iterator[R] + + def toCogroupJoiner2[K, V, U, R](hashJoiner: (K, V, Iterable[U]) => Iterator[R]): JoinFn[K, V, U, R] = + JoinFromHashJoin(hashJoiner) + + def hashInner2[K, V, U]: HashJoinFn[K, V, U, (V, U)] = + HashInner() + + def hashLeft2[K, V, U]: HashJoinFn[K, V, U, (V, Option[U])] = + HashLeft() + + def inner2[K, V, U]: JoinFn[K, V, U, (V, U)] = + InnerJoin() + + def asOuter[U](it: Iterator[U]): Iterator[Option[U]] = + if (it.isEmpty) Iterator.single(None) + else it.map(Some(_)) + + def outer2[K, V, U]: JoinFn[K, V, U, (Option[V], Option[U])] = + OuterJoin() + + def left2[K, V, U]: JoinFn[K, V, U, (V, Option[U])] = + LeftJoin() + + def right2[K, V, U]: JoinFn[K, V, U, (Option[V], U)] = + RightJoin() + + /** + * Optimizers want to match on the kinds of joins we are doing. + * This gives them that ability + */ + sealed abstract class HashJoinFunction[K, V, U, R] extends Function3[K, V, Iterable[U], Iterator[R]] + + final case class HashInner[K, V, U]() extends HashJoinFunction[K, V, U, (V, U)] { + def apply(k: K, v: V, u: Iterable[U]) = u.iterator.map((v, _)) + } + final case class HashLeft[K, V, U]() extends HashJoinFunction[K, V, U, (V, Option[U])] { + def apply(k: K, v: V, u: Iterable[U]) = asOuter(u.iterator).map((v, _)) + } + final case class FilteredHashJoin[K, V1, V2, R](jf: HashJoinFunction[K, V1, V2, R], fn: ((K, R)) => Boolean) extends HashJoinFunction[K, V1, V2, R] { + def apply(k: K, left: V1, right: Iterable[V2]) = + jf.apply(k, left, right).filter { r => fn((k, r)) } + } + final case class MappedHashJoin[K, V1, V2, R, R1](jf: HashJoinFunction[K, V1, V2, R], fn: R => R1) extends HashJoinFunction[K, V1, V2, R1] { + def apply(k: K, left: V1, right: Iterable[V2]) = + jf.apply(k, left, right).map(fn) + } + final case class FlatMappedHashJoin[K, V1, V2, R, R1](jf: HashJoinFunction[K, V1, V2, R], fn: R => TraversableOnce[R1]) extends HashJoinFunction[K, V1, V2, R1] { + def apply(k: K, left: V1, right: Iterable[V2]) = + jf.apply(k, left, right).flatMap(fn) } - def hashInner2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => itu.iterator.map { (v, _) } } - def hashLeft2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => asOuter(itu.iterator).map { (v, _) } } + sealed abstract class JoinFunction[K, V1, V2, R] extends Function3[K, Iterator[V1], Iterable[V2], Iterator[R]] - def inner2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) => - itv.flatMap { v => itu.map { u => (v, u) } } + final case class InnerJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (V1, V2)] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(V1, V2)] = + left.flatMap { v1 => right.iterator.map((v1, _)) } } - def asOuter[U](it: Iterator[U]): Iterator[Option[U]] = { - if (it.isEmpty) { - Iterator(None) - } else { - it.map { Some(_) } - } + final case class LeftJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (V1, Option[V2])] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(V1, Option[V2])] = + left.flatMap { v1 => asOuter(right.iterator).map((v1, _)) } } - def outer2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) => - if (itv.isEmpty && itu.isEmpty) { - Iterator.empty - } else { - asOuter(itv).flatMap { v => asOuter(itu.iterator).map { u => (v, u) } } - } + final case class RightJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (Option[V1], V2)] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(Option[V1], V2)] = + asOuter(left).flatMap { v1 => right.iterator.map((v1, _)) } + } + final case class OuterJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (Option[V1], Option[V2])] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(Option[V1], Option[V2])] = + if (left.isEmpty && right.isEmpty) Iterator.empty + else asOuter(left).flatMap { v1 => asOuter(right.iterator).map((v1, _)) } + } + final case class FilteredJoin[K, V1, V2, R](jf: JoinFunction[K, V1, V2, R], fn: ((K, R)) => Boolean) extends JoinFunction[K, V1, V2, R] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = + jf.apply(k, left, right).filter { r => fn((k, r)) } + } + final case class MappedJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: R => R1) extends JoinFunction[K, V1, V2, R1] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = + jf.apply(k, left, right).map(fn) + } + final case class FlatMappedJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: R => TraversableOnce[R1]) extends JoinFunction[K, V1, V2, R1] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = + jf.apply(k, left, right).flatMap(fn) } - def left2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) => - itv.flatMap { v => asOuter(itu.iterator).map { u => (v, u) } } + final case class MappedGroupJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: (K, Iterator[R]) => Iterator[R1]) extends JoinFunction[K, V1, V2, R1] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = { + val iterr = jf.apply(k, left, right) + if (iterr.isEmpty) Iterator.empty // mapGroup operates on non-empty groups + else fn(k, iterr) + } } - def right2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) => - asOuter(itv).flatMap { v => itu.map { u => (v, u) } } + final case class JoinFromHashJoin[K, V1, V2, R](hj: (K, V1, Iterable[V2]) => Iterator[R]) extends JoinFunction[K, V1, V2, R] { + def apply(k: K, itv: Iterator[V1], itu: Iterable[V2]) = + itv.flatMap(hj(k, _, itu)) } + + /** + * an inner-like join function is empty definitely if either side is empty + */ + final def isInnerJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] = + jf match { + case InnerJoin() => Some(true) + case LeftJoin() => Some(false) + case RightJoin() => Some(false) + case OuterJoin() => Some(false) + case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj) + case FilteredJoin(jf, _) => isInnerJoinLike(jf) + case MappedJoin(jf, _) => isInnerJoinLike(jf) + case FlatMappedJoin(jf, _) => isInnerJoinLike(jf) + case MappedGroupJoin(jf, _) => isInnerJoinLike(jf) + case _ => None + } + /** + * a left-like join function is empty definitely if the left side is empty + */ + final def isLeftJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] = + jf match { + case InnerJoin() => Some(true) + case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj) + case LeftJoin() => Some(true) + case RightJoin() => Some(false) + case OuterJoin() => Some(false) + case FilteredJoin(jf, _) => isLeftJoinLike(jf) + case MappedJoin(jf, _) => isLeftJoinLike(jf) + case FlatMappedJoin(jf, _) => isLeftJoinLike(jf) + case MappedGroupJoin(jf, _) => isLeftJoinLike(jf) + case _ => None + } + /** + * a right-like join function is empty definitely if the right side is empty + */ + final def isRightJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] = + jf match { + case InnerJoin() => Some(true) + case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj) + case LeftJoin() => Some(false) + case RightJoin() => Some(true) + case OuterJoin() => Some(false) + case FilteredJoin(jf, _) => isRightJoinLike(jf) + case MappedJoin(jf, _) => isRightJoinLike(jf) + case FlatMappedJoin(jf, _) => isRightJoinLike(jf) + case MappedGroupJoin(jf, _) => isRightJoinLike(jf) + case _ => None + } + + /** + * a inner-like hash-join function is empty definitely if either side is empty + */ + final def isInnerHashJoinLike[K, V1, V2, R](jf: (K, V1, Iterable[V2]) => Iterator[R]): Option[Boolean] = + jf match { + case HashInner() => Some(true) + case HashLeft() => Some(false) + case FilteredHashJoin(jf, _) => isInnerHashJoinLike(jf) + case MappedHashJoin(jf, _) => isInnerHashJoinLike(jf) + case FlatMappedHashJoin(jf, _) => isInnerHashJoinLike(jf) + case _ => None + } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala index 2856816b62..e0afe81d0e 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala @@ -329,4 +329,283 @@ object OptimizationRules { } }) } + + ///////////////////////////// + // + // Here are some actual rules for simplifying TypedPipes + // + ///////////////////////////// + + /** + * a.flatMap(f).flatMap(g) == a.flatMap { x => f(x).flatMap(g) } + */ + object ComposeFlatMap extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case FlatMapped(FlatMapped(in, fn0), fn1) => + FlatMapped(in, FlatMappedFn(fn1).runAfter(FlatMapping.FlatM(fn0))) + case FlatMapValues(FlatMapValues(in, fn0), fn1) => + FlatMapValues(in, FlatMappedFn(fn1).runAfter(FlatMapping.FlatM(fn0))) + } + } + + /** + * a.map(f).map(g) == a.map { x => f(x).map(g) } + */ + object ComposeMap extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case Mapped(Mapped(in, fn0), fn1) => + Mapped(in, ComposedMapFn(fn0, fn1)) + case MapValues(MapValues(in, fn0), fn1) => + MapValues(in, ComposedMapFn(fn0, fn1)) + } + } + + /** + * a.filter(f).filter(g) == a.filter { x => f(x) && g(x) } + */ + object ComposeFilter extends Rule[TypedPipe] { + def apply[T](on: Dag[TypedPipe]) = { + // scala can't type check this, so we hold its hand: + // case Filter(Filter(in, fn0), fn1) => + // Some(Filter(in, ComposedFilterFn(fn0, fn1))) + case f@Filter(_, _) => + def go[A](f: Filter[A]): Option[TypedPipe[A]] = + f.input match { + case f1: Filter[a] => + Some(Filter[a](f1.input, ComposedFilterFn(f.fn, f.fn))) + case _ => None + } + go(f) + case FilterKeys(FilterKeys(in, fn0), fn1) => + Some(FilterKeys(in, ComposedFilterFn(fn0, fn1))) + case _ => None + } + } + + /** + * a.onComplete(f).onComplete(g) == a.onComplete { () => f(); g() } + */ + object ComposeWithOnComplete extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case WithOnComplete(WithOnComplete(pipe, fn0), fn1) => + WithOnComplete(pipe, ComposedOnComplete(fn0, fn1)) + } + } + + /** + * After a forceToDisk there is no need to immediately fork. + * Calling forceToDisk twice in a row is the same as once. + * Calling fork twice in a row is the same as once. + */ + object RemoveDuplicateForceFork extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case ForceToDisk(ForceToDisk(t)) => ForceToDisk(t) + case ForceToDisk(Fork(t)) => ForceToDisk(t) + case Fork(Fork(t)) => Fork(t) + case Fork(ForceToDisk(t)) => ForceToDisk(t) + } + } + + /** + * We ignore .group if there are is no setting of reducers + * + * This is arguably not a great idea, but scalding has always + * done it to minimize accidental map-reduce steps + */ + object IgnoreNoOpGroup extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case ReduceStepPipe(IdentityReduce(_, input, None, _)) => + input + } + } + + /** + * In map-reduce settings, Merge is almost free in two contexts: + * 1. the final write + * 2. at the point we are doing a shuffle anyway. + * + * By defering merge as long as possible, we hope to find more such + * cases + */ + object DeferMerge extends PartialRule[TypedPipe] { + private def handleFilter[A]: PartialFunction[Filter[A], TypedPipe[A]] = { + case Filter(MergedTypedPipe(a, b), fn) => MergedTypedPipe(Filter(a, fn), Filter(b, fn)) + } + + def applyWhere[T](on: Dag[TypedPipe]) = { + case Mapped(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(Mapped(a, fn), Mapped(b, fn)) + case FlatMapped(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(FlatMapped(a, fn), FlatMapped(b, fn)) + case MapValues(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(MapValues(a, fn), MapValues(b, fn)) + case FlatMapValues(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(FlatMapValues(a, fn), FlatMapValues(b, fn)) + case f@Filter(_, _) if handleFilter.isDefinedAt(f) => handleFilter(f) + case FilterKeys(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(FilterKeys(a, fn), FilterKeys(b, fn)) + } + } + + /** + * Push filterKeys up as early as possible. This can happen before + * a shuffle, which can be a major win. This allows you to write + * generic methods that return all the data, but if downstream someone + * only wants certain keys they don't pay to compute everything. + * + * This is an optimization we didn't do in scalding 0.17 and earlier + * because .toTypedPipe on the group totally hid the structure from + * us + */ + object FilterKeysEarly extends Rule[TypedPipe] { + private def filterReduceStep[K, V1, V2](rs: ReduceStep[K, V1, V2], fn: K => Boolean): ReduceStep[K, _, _ <: V2] = + rs match { + case step@IdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@UnsortedIdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@IdentityValueSortedReduce(_, _, _, _, _) => step.filterKeys(fn) + case step@ValueSortedReduce(_, _, _, _, _, _) => step.filterKeys(fn) + case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn) + } + + private def filterCoGroupable[K, V](rs: CoGroupable[K, V], fn: K => Boolean): CoGroupable[K, V] = + rs match { + case step@IdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@UnsortedIdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn) + case cg: CoGrouped[K, V] => filterCoGroup(cg, fn) + } + + private def filterCoGroup[K, V](cg: CoGrouped[K, V], fn: K => Boolean): CoGrouped[K, V] = + cg match { + case CoGrouped.Pair(a, b, jf) => + CoGrouped.Pair(filterCoGroupable(a, fn), filterCoGroupable(b, fn), jf) + case CoGrouped.FilterKeys(cg, g) => + filterCoGroup(cg, ComposedFilterFn(g, fn)) + case CoGrouped.MapGroup(cg, g) => + CoGrouped.MapGroup(filterCoGroup(cg, fn), g) + case CoGrouped.WithDescription(cg, d) => + CoGrouped.WithDescription(filterCoGroup(cg, fn), d) + case CoGrouped.WithReducers(cg, r) => + CoGrouped.WithReducers(filterCoGroup(cg, fn), r) + } + + def apply[T](on: Dag[TypedPipe]) = { + case FilterKeys(ReduceStepPipe(rsp), fn) => + Some(ReduceStepPipe(filterReduceStep(rsp, fn))) + case FilterKeys(CoGroupedPipe(cg), fn) => + Some(CoGroupedPipe(filterCoGroup(cg, fn))) + case FilterKeys(HashCoGroup(left, right, joiner), fn) => + val newRight = right match { + case step@IdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@UnsortedIdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn) + } + Some(HashCoGroup(FilterKeys(left, fn), newRight, joiner)) + case FilterKeys(MapValues(pipe, mapFn), filterFn) => + Some(MapValues(FilterKeys(pipe, filterFn), mapFn)) + case FilterKeys(FlatMapValues(pipe, fmFn), filterFn) => + Some(FlatMapValues(FilterKeys(pipe, filterFn), fmFn)) + case _ => None + } + } + + /** + * EmptyTypedPipe is kind of zero of most of these operations + * We go ahead and simplify as much as possible if we see + * an EmptyTypedPipe + */ + object EmptyIsOftenNoOp extends PartialRule[TypedPipe] { + + private def emptyCogroup[K, V](cg: CoGrouped[K, V]): Boolean = { + import CoGrouped._ + + def empty(t: TypedPipe[Any]): Boolean = t match { + case EmptyTypedPipe => true + case _ => false + } + cg match { + case Pair(left, _, jf) if left.inputs.forall(empty) && (Joiner.isLeftJoinLike(jf) == Some(true)) => true + case Pair(_, right, jf) if right.inputs.forall(empty) && (Joiner.isRightJoinLike(jf) == Some(true)) => true + case Pair(left, right, _) if left.inputs.forall(empty) && right.inputs.forall(empty) => true + case Pair(_, _, _) => false + case WithDescription(cg, _) => emptyCogroup(cg) + case WithReducers(cg, _) => emptyCogroup(cg) + case MapGroup(cg, _) => emptyCogroup(cg) + case FilterKeys(cg, _) => emptyCogroup(cg) + } + } + + private def emptyHashJoinable[K, V](hj: HashJoinable[K, V]): Boolean = + hj match { + case step@IdentityReduce(_, _, _, _) => step.mapped == EmptyTypedPipe + case step@UnsortedIdentityReduce(_, _, _, _) => step.mapped == EmptyTypedPipe + case step@IteratorMappedReduce(_, _, _, _, _) => step.mapped == EmptyTypedPipe + } + + def applyWhere[T](on: Dag[TypedPipe]) = { + case CrossPipe(EmptyTypedPipe, _) => EmptyTypedPipe + case CrossPipe(_, EmptyTypedPipe) => EmptyTypedPipe + case CrossValue(EmptyTypedPipe, _) => EmptyTypedPipe + case CrossValue(_, ComputedValue(EmptyTypedPipe)) => EmptyTypedPipe + case CrossValue(_, EmptyValue) => EmptyTypedPipe + case DebugPipe(EmptyTypedPipe) => EmptyTypedPipe + case FilterKeys(EmptyTypedPipe, _) => EmptyTypedPipe + case Filter(EmptyTypedPipe, _) => EmptyTypedPipe + case FlatMapValues(EmptyTypedPipe, _) => EmptyTypedPipe + case FlatMapped(EmptyTypedPipe, _) => EmptyTypedPipe + case ForceToDisk(EmptyTypedPipe) => EmptyTypedPipe + case Fork(EmptyTypedPipe) => EmptyTypedPipe + case HashCoGroup(EmptyTypedPipe, _, _) => EmptyTypedPipe + case HashCoGroup(_, right, hjf) if emptyHashJoinable(right) && Joiner.isInnerHashJoinLike(hjf) == Some(true) => EmptyTypedPipe + case MapValues(EmptyTypedPipe, _) => EmptyTypedPipe + case Mapped(EmptyTypedPipe, _) => EmptyTypedPipe + case MergedTypedPipe(EmptyTypedPipe, a) => a + case MergedTypedPipe(a, EmptyTypedPipe) => a + case ReduceStepPipe(rs: ReduceStep[_, _, _]) if rs.mapped == EmptyTypedPipe => EmptyTypedPipe + case SumByLocalKeys(EmptyTypedPipe, _) => EmptyTypedPipe + case TrappedPipe(EmptyTypedPipe, _, _) => EmptyTypedPipe + case CoGroupedPipe(cgp) if emptyCogroup(cgp) => EmptyTypedPipe + } + } + + /** + * If an Iterable is empty, it is the same as EmptyTypedPipe + */ + object EmptyIterableIsEmpty extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case IterablePipe(it) if it.isEmpty => EmptyTypedPipe + } + } + + /** + * To keep equality for case matching and caching, we need to create internal case classes + */ + private[this] case class ComposedMapFn[A, B, C](fn0: A => B, fn1: B => C) extends Function1[A, C] { + def apply(a: A) = fn1(fn0(a)) + } + private[this] case class ComposedFilterFn[-A](fn0: A => Boolean, fn1: A => Boolean) extends Function1[A, Boolean] { + def apply(a: A) = fn0(a) && fn1(a) + } + + /** + * This is only called at the end of a task, so might as well make it stack safe since a little + * extra runtime cost won't matter + */ + private[this] case class ComposedOnComplete(fn0: () => Unit, fn1: () => Unit) extends Function0[Unit] { + def apply(): Unit = { + @annotation.tailrec + def loop(fn: () => Unit, stack: List[() => Unit]): Unit = + fn match { + case ComposedOnComplete(left, right) => loop(left, right :: stack) + case notComposed => + notComposed() + stack match { + case h :: tail => loop(h, tail) + case Nil => () + } + } + + loop(fn0, List(fn1)) + } + } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala index de17574ae2..cd8cbd5e21 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala @@ -357,7 +357,7 @@ class ExecutionTest extends WordSpec with Matchers { val files = cleanupHook.get.asInstanceOf[TempFileCleanup].filesToCleanup assert(files.size == 1) - assert(files(0).contains(tempFile)) + assert(files.head.contains(tempFile)) cleanupHook.get.run() // Remove the hook so it doesn't show up in the list of shutdown hooks for other tests Runtime.getRuntime.removeShutdownHook(cleanupHook.get) @@ -385,7 +385,7 @@ class ExecutionTest extends WordSpec with Matchers { val files = cleanupHook.get.asInstanceOf[TempFileCleanup].filesToCleanup assert(files.size == 2) - assert(files(0).contains(tempFileOne) || files(0).contains(tempFileTwo)) + assert(files.head.contains(tempFileOne) || files.head.contains(tempFileTwo)) assert(files(1).contains(tempFileOne) || files(1).contains(tempFileTwo)) cleanupHook.get.run() // Remove the hook so it doesn't show up in the list of shutdown hooks for other tests diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala index 361394795b..0b794ec461 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala @@ -1,6 +1,8 @@ package com.twitter.scalding.typed +import com.stripe.dagon.{ Dag, Rule } import com.twitter.scalding.source.TypedText +import com.twitter.scalding.{ Config, Local } import org.scalatest.FunSuite import org.scalatest.prop.PropertyChecks import org.scalacheck.{ Arbitrary, Gen } @@ -13,13 +15,14 @@ object TypedPipeGen { Gen.oneOf(g1, src, Gen.const(TypedPipe.empty)) } - lazy val mapped: Gen[TypedPipe[Int]] = { + def mapped(srcGen: Gen[TypedPipe[Int]]): Gen[TypedPipe[Int]] = { + val mappedRec = Gen.lzy(mapped(srcGen)) val next1: Gen[TypedPipe[Int] => TypedPipe[Int]] = Gen.oneOf( - tpGen.map { p: TypedPipe[Int] => + tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x.cross(p).keys } }, - tpGen.map { p: TypedPipe[Int] => + tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x.cross(ValuePipe(2)).values } }, Gen.const({ t: TypedPipe[Int] => t.debug }), @@ -28,7 +31,7 @@ object TypedPipeGen { }, Gen.const({ t: TypedPipe[Int] => t.forceToDisk }), Gen.const({ t: TypedPipe[Int] => t.fork }), - tpGen.map { p: TypedPipe[Int] => + tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x ++ p } }, Gen.identifier.map { id => @@ -40,7 +43,7 @@ object TypedPipeGen { val one = for { n <- next1 - p <- tpGen + p <- tpGen(srcGen) } yield n(p) val next2: Gen[TypedPipe[(Int, Int)] => TypedPipe[Int]] = @@ -50,62 +53,63 @@ object TypedPipeGen { val two = for { n <- next2 - p <- keyed + p <- keyed(srcGen) } yield n(p) Gen.frequency((4, one), (1, two)) } - lazy val keyed: Gen[TypedPipe[(Int, Int)]] = { + def keyed(srcGen: Gen[TypedPipe[Int]]): Gen[TypedPipe[(Int, Int)]] = { + val keyRec = Gen.lzy(keyed(srcGen)) val one = Gen.oneOf( for { - single <- tpGen + single <- tpGen(srcGen) fn <- Arbitrary.arbitrary[Int => (Int, Int)] } yield single.map(fn), for { - single <- tpGen + single <- tpGen(srcGen) fn <- Arbitrary.arbitrary[Int => List[(Int, Int)]] } yield single.flatMap(fn)) val two = Gen.oneOf( for { fn <- Arbitrary.arbitrary[Int => Boolean] - pair <- keyed + pair <- keyRec } yield pair.filterKeys(fn), for { fn <- Arbitrary.arbitrary[Int => List[Int]] - pair <- keyed + pair <- keyRec } yield pair.flatMapValues(fn), for { fn <- Arbitrary.arbitrary[Int => Int] - pair <- keyed + pair <- keyRec } yield pair.mapValues(fn), for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.sumByKey.toTypedPipe, for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.sumByLocalKeys, for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.group.mapGroup { (k, its) => its }.toTypedPipe, for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.group.sorted.mapGroup { (k, its) => its }.toTypedPipe, for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.group.sorted.withReducers(2).mapGroup { (k, its) => its }.toTypedPipe, for { - p1 <- Gen.lzy(keyed) - p2 <- Gen.lzy(keyed) + p1 <- keyRec + p2 <- keyRec } yield p1.hashJoin(p2).values, for { - p1 <- Gen.lzy(keyed) - p2 <- Gen.lzy(keyed) + p1 <- keyRec + p2 <- keyRec } yield p1.join(p2).values, for { - p1 <- Gen.lzy(keyed) - p2 <- Gen.lzy(keyed) + p1 <- keyRec + p2 <- keyRec } yield p1.join(p2).mapValues { case (a, b) => a * b }.toTypedPipe) // bias to consuming Int, since the we can stack overflow with the (Int, Int) @@ -113,8 +117,21 @@ object TypedPipeGen { Gen.frequency((2, one), (1, two)) } - val tpGen: Gen[TypedPipe[Int]] = - Gen.lzy(Gen.frequency((1, srcGen), (1, mapped))) + def tpGen(srcGen: Gen[TypedPipe[Int]]): Gen[TypedPipe[Int]] = + Gen.lzy(Gen.frequency((1, srcGen), (1, mapped(srcGen)))) + + /** + * This generates a TypedPipe that can't neccesarily + * be run because it has fake sources + */ + val genWithFakeSources: Gen[TypedPipe[Int]] = tpGen(srcGen) + + /** + * This can always be run because all the sources are + * Iterable sources + */ + val genWithIterableSources: Gen[TypedPipe[Int]] = + tpGen(Gen.listOf(Arbitrary.arbitrary[Int]).map(TypedPipe.from(_))) } class OptimizationRulesTest extends FunSuite { @@ -124,11 +141,41 @@ class OptimizationRulesTest extends FunSuite { assert(toLiteral(t).evaluate == t) test("randomly generated TypedPipe trees are invertible") { - forAll(TypedPipeGen.tpGen) { (t: TypedPipe[Int]) => + forAll(TypedPipeGen.genWithFakeSources) { (t: TypedPipe[Int]) => invert(t) } } + def optimizationLaw[T: Ordering](t: TypedPipe[T], rule: Rule[TypedPipe]) = { + val optimized = Dag.applyRule(t, toLiteral, rule) + + assert(TypedPipeDiff.diff(t, optimized) + .toIterableExecution + .waitFor(Config.empty, Local(true)).get.isEmpty) + } + + test("all optimization rules don't change results") { + import OptimizationRules._ + + val allRules = List(ComposeFlatMap, + ComposeMap, + ComposeFilter, + ComposeWithOnComplete, + RemoveDuplicateForceFork, + IgnoreNoOpGroup, + DeferMerge, + FilterKeysEarly, + EmptyIsOftenNoOp, + EmptyIterableIsEmpty) + + val genRule = for { + c <- Gen.choose(1, allRules.size) + rs <- Gen.pick(c, allRules) + } yield rs.reduce(_.orElse(_)) + + forAll(TypedPipeGen.genWithIterableSources, genRule)(optimizationLaw[Int] _) + } + test("OptimizationRules.toLiteral is invertible on some specific instances") { invert(TypedPipe.from(TypedText.tsv[Int]("foo")))