From 52eae5e4263cbc3e77464feb8c5ba709819280f3 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Fri, 22 Sep 2017 23:32:48 -1000 Subject: [PATCH 1/9] Implement Dagon.toLiteral --- build.sbt | 2 + .../scalding/typed/OptimizationRules.scala | 344 ++++++++++++++++++ .../twitter/scalding/typed/TypedPipe.scala | 21 +- .../cascading_backend/CascadingBackend.scala | 6 +- .../typed/OptimizationRulesTest.scala | 184 ++++++++++ 5 files changed, 545 insertions(+), 12 deletions(-) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala diff --git a/build.sbt b/build.sbt index d319d0dd49..388feec8a4 100644 --- a/build.sbt +++ b/build.sbt @@ -21,6 +21,7 @@ val avroVersion = "1.7.4" val bijectionVersion = "0.9.5" val cascadingAvroVersion = "2.1.2" val chillVersion = "0.8.4" +val dagonVersion = "0.2.0" val elephantbirdVersion = "4.15" val hadoopLzoVersion = "0.4.19" val hadoopVersion = "2.5.0" @@ -316,6 +317,7 @@ lazy val scaldingCore = module("core").settings( "cascading" % "cascading-core" % cascadingVersion, "cascading" % "cascading-hadoop" % cascadingVersion, "cascading" % "cascading-local" % cascadingVersion, + "com.stripe" %% "dagon-core" % dagonVersion, "com.twitter" % "chill-hadoop" % chillVersion, "com.twitter" % "chill-java" % chillVersion, "com.twitter" %% "chill-bijection" % chillVersion, diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala new file mode 100644 index 0000000000..1afdd5f350 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala @@ -0,0 +1,344 @@ +package com.twitter.scalding.typed + +import com.stripe.dagon.{ FunctionK, Memoize, Rule, PartialRule, Dag, Literal } + +object OptimizationRules { + type LitPipe[T] = Literal[TypedPipe, T] + + import Literal.{ Unary, Binary } + import TypedPipe._ + + /** + * Since our TypedPipe is covariant, but the Literal is not + * this is actually safe in this context, but not in general + */ + def widen[T](l: LitPipe[_ <: T]): LitPipe[T] = { + // to prove this is safe, see that if you have + // LitPipe[_ <: T] we can call .evaluate to get + // TypedPipe[_ <: T] which due to covariance is + // TypedPipe[T], and then using toLiteral we can get + // LitPipe[T] + // + // that would be wasteful to apply since the final + // result is identity. + l.asInstanceOf[LitPipe[T]] + } + + /** + * Convert a TypedPipe[T] to a Literal[TypedPipe, T] for + * use with Dagon + */ + def toLiteral: FunctionK[TypedPipe, LitPipe] = + Memoize.functionK[TypedPipe, LitPipe]( + new Memoize.RecursiveK[TypedPipe, LitPipe] { + + def toFunction[A] = { + case (c: CrossPipe[a, b], f) => + Binary(f(c.left), f(c.right), CrossPipe(_: TypedPipe[a], _: TypedPipe[b])) + case (cv@CrossValue(_, _), f) => + def go[A, B](cv: CrossValue[A, B]): LitPipe[(A, B)] = + cv match { + case CrossValue(a, ComputedValue(v)) => + Binary(f(a), f(v), { (a: TypedPipe[A], b: TypedPipe[B]) => + CrossValue(a, ComputedValue(b)) + }) + case CrossValue(a, v) => + Unary(f(a), CrossValue(_: TypedPipe[A], v)) + } + widen(go(cv)) + case (p: DebugPipe[a], f) => + Unary(f(p.input), DebugPipe(_: TypedPipe[a])) + case (p: FilterKeys[a, b], f) => + widen(Unary(f(p.input), FilterKeys(_: TypedPipe[(a, b)], p.fn))) + case (p: Filter[a], f) => + Unary(f(p.input), Filter(_: TypedPipe[a], p.fn)) + case (p: Fork[a], f) => + Unary(f(p.input), Fork(_: TypedPipe[a])) + case (p: FlatMapValues[a, b, c], f) => + widen(Unary(f(p.input), FlatMapValues(_: TypedPipe[(a, b)], p.fn))) + case (p: FlatMapped[a, b], f) => + Unary(f(p.input), FlatMapped(_: TypedPipe[a], p.fn)) + case (p: ForceToDisk[a], f) => + Unary(f(p.input), ForceToDisk(_: TypedPipe[a])) + case (it@IterablePipe(_), _) => + Literal.Const(it) + case (p: MapValues[a, b, c], f) => + widen(Unary(f(p.input), MapValues(_: TypedPipe[(a, b)], p.fn))) + case (p: Mapped[a, b], f) => + Unary(f(p.input), Mapped(_: TypedPipe[a], p.fn)) + case (p: MergedTypedPipe[a], f) => + Binary(f(p.left), f(p.right), MergedTypedPipe(_: TypedPipe[a], _: TypedPipe[a])) + case (src@SourcePipe(_), _) => + Literal.Const(src) + case (p: SumByLocalKeys[a, b], f) => + widen(Unary(f(p.input), SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup))) + case (p: TrappedPipe[a], f) => + Unary(f(p.input), TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv)) + case (p: WithDescriptionTypedPipe[a], f) => + Unary(f(p.input), WithDescriptionTypedPipe(_: TypedPipe[a], p.description, p.deduplicate)) + case (p: WithOnComplete[a], f) => + Unary(f(p.input), WithOnComplete(_: TypedPipe[a], p.fn)) + case (EmptyTypedPipe, _) => + Literal.Const(EmptyTypedPipe) + case (hg: HashCoGroup[a, b, c, d], f) => + widen(handleHashCoGroup(hg, f)) + case (CoGroupedPipe(cg), f) => + widen(handleCoGrouped(cg, f)) + case (ReduceStepPipe(rs), f) => + widen(handleReduceStep(rs, f)) + } + }) + + private def handleReduceStep[K, V1, V2](rs: ReduceStep[K, V1, V2], recurse: FunctionK[TypedPipe, LitPipe]): LitPipe[(K, V2)] = + rs match { + case step@IdentityReduce(_, _, _, _) => + Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) }) + case step@UnsortedIdentityReduce(_, _, _, _) => + Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) }) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + def go[A, B](ivsr: IdentityValueSortedReduce[A, B]): LitPipe[(A, B)] = + Unary(widen[(A, B)](recurse(ivsr.mapped)), { (tp: TypedPipe[(A, B)]) => + ReduceStepPipe[A, B, B](IdentityValueSortedReduce[A, B]( + ivsr.keyOrdering, + tp, + ivsr.valueSort, + ivsr.reducers, + ivsr.descriptions)) + }) + widen[(K, V2)](go(step)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + def go[A, B, C](vsr: ValueSortedReduce[A, B, C]): LitPipe[(A, C)] = + Unary(recurse(vsr.mapped), { (tp: TypedPipe[(A, B)]) => + ReduceStepPipe[A, B, C](ValueSortedReduce[A, B, C]( + vsr.keyOrdering, + tp, + vsr.valueSort, + vsr.reduceFn, + vsr.reducers, + vsr.descriptions)) + }) + go(step) + case step@IteratorMappedReduce(_, _, _, _, _) => + def go[A, B, C](imr: IteratorMappedReduce[A, B, C]): LitPipe[(A, C)] = + Unary(recurse(imr.mapped), { (tp: TypedPipe[(A, B)]) => ReduceStepPipe[A, B, C](imr.copy(mapped = tp)) }) + + go(step) + } + + private def handleCoGrouped[K, V](cg: CoGroupable[K, V], recurse: FunctionK[TypedPipe, LitPipe]): LitPipe[(K, V)] = { + import CoGrouped._ + + def pipeToCG[V1](t: TypedPipe[(K, V1)]): CoGroupable[K, V1] = + t match { + case ReduceStepPipe(cg: CoGroupable[K @unchecked, V1 @unchecked]) => + // we are relying on the fact that we use Ordering[K] + // as a contravariant type, despite it not being defined + // that way. + cg + case CoGroupedPipe(cg) => + // we are relying on the fact that we use Ordering[K] + // as a contravariant type, despite it not being defined + // that way. + cg.asInstanceOf[CoGroupable[K, V1]] + case kvPipe => IdentityReduce(cg.keyOrdering, kvPipe, None, Nil) + } + + cg match { + case p@Pair(_, _, _) => + def go[A, B, C](pair: Pair[K, A, B, C]): LitPipe[(K, C)] = { + val llit = handleCoGrouped(pair.larger, recurse) + val rlit = handleCoGrouped(pair.smaller, recurse) + val fn = pair.fn + Binary(llit, rlit, { (l: TypedPipe[(K, A)], r: TypedPipe[(K, B)]) => + Pair(pipeToCG(l), pipeToCG(r), fn) + }) + } + widen(go(p)) + case wr@WithReducers(_, _) => + def go[V1 <: V](wr: WithReducers[K, V1]): LitPipe[(K, V)] = { + val reds = wr.reds + Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(wr.on, recurse), { (tp: TypedPipe[(K, V1)]) => + tp match { + case ReduceStepPipe(rs) => + withReducers(rs, reds) + case CoGroupedPipe(cg) => + // we are relying on the fact that we use Ordering[K] + // as a contravariant type, despite it not being defined + // that way. + CoGroupedPipe(WithReducers(cg, reds)) + case kvPipe => + ReduceStepPipe(IdentityReduce(cg.keyOrdering, kvPipe, None, Nil) + .withReducers(reds)) + } + }) + } + widen(go(wr)) + case wd@WithDescription(_, _) => + def go[V1 <: V](wd: WithDescription[K, V1]): LitPipe[(K, V)] = { + val desc = wd.description + Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(wd.on, recurse), { (tp: TypedPipe[(K, V1)]) => + tp match { + case ReduceStepPipe(rs) => + withDescription(rs, desc) + case CoGroupedPipe(cg) => + // we are relying on the fact that we use Ordering[K] + // as a contravariant type, despite it not being defined + // that way. + CoGroupedPipe(WithDescription(cg, desc)) + case kvPipe => + kvPipe.withDescription(desc) + } + }) + } + widen(go(wd)) + case fk@FilterKeys(_, _) => + def go[V1 <: V](fk: FilterKeys[K, V1]): LitPipe[(K, V)] = { + val fn = fk.fn + Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(fk.on, recurse), { (tp: TypedPipe[(K, V1)]) => + tp match { + case ReduceStepPipe(rs) => + filterKeys(rs, fn) + case CoGroupedPipe(cg) => + // we are relying on the fact that we use Ordering[K] + // as a contravariant type, despite it not being defined + // that way. + CoGroupedPipe(FilterKeys(cg, fn)) + case kvPipe => + kvPipe.filterKeys(fn) + } + }) + } + widen(go(fk)) + case mg@MapGroup(_, _) => + def go[V1, V2 <: V](mg: MapGroup[K, V1, V2]): LitPipe[(K, V)] = { + val fn = mg.fn + Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(mg.on, recurse), { (tp: TypedPipe[(K, V1)]) => + tp match { + case ReduceStepPipe(rs) => + mapGroup(rs, fn) + case CoGroupedPipe(cg) => + // we are relying on the fact that we use Ordering[K] + // as a contravariant type, despite it not being defined + // that way. + CoGroupedPipe(MapGroup(cg, fn)) + case kvPipe => + ReduceStepPipe( + IdentityReduce(cg.keyOrdering, kvPipe, None, Nil) + .mapGroup(fn)) + } + }) + } + widen(go(mg)) + case step@IdentityReduce(_, _, _, _) => + widen(handleReduceStep(step, recurse)) + case step@UnsortedIdentityReduce(_, _, _, _) => + widen(handleReduceStep(step, recurse)) + case step@IteratorMappedReduce(_, _, _, _, _) => + widen(handleReduceStep(step, recurse)) + } + } + + /** + * This can't really usefully be on ReduceStep since users never want to use it + * as an ADT, as the planner does. + */ + private def withReducers[K, V1, V2](rs: ReduceStep[K, V1, V2], reds: Int): TypedPipe[(K, V2)] = + rs match { + case step@IdentityReduce(_, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + case step@UnsortedIdentityReduce(_, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + case step@IteratorMappedReduce(_, _, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + } + + private def withDescription[K, V1, V2](rs: ReduceStep[K, V1, V2], descr: String): TypedPipe[(K, V2)] = + rs match { + case step@IdentityReduce(_, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + case step@UnsortedIdentityReduce(_, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + case step@IteratorMappedReduce(_, _, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + } + + private def filterKeys[K, V1, V2](rs: ReduceStep[K, V1, V2], fn: K => Boolean): TypedPipe[(K, V2)] = + rs match { + case IdentityReduce(ord, p, r, d) => + ReduceStepPipe(IdentityReduce(ord, FilterKeys(p, fn), r, d)) + case UnsortedIdentityReduce(ord, p, r, d) => + ReduceStepPipe(UnsortedIdentityReduce(ord, FilterKeys(p, fn), r, d)) + case ivsr@IdentityValueSortedReduce(_, _, _, _, _) => + def go[V](ivsr: IdentityValueSortedReduce[K, V]): TypedPipe[(K, V)] = { + val IdentityValueSortedReduce(ord, p, v, r, d) = ivsr + ReduceStepPipe(IdentityValueSortedReduce[K, V](ord, FilterKeys(p, fn), v, r, d)) + } + go(ivsr) + case vsr@ValueSortedReduce(_, _, _, _, _, _) => + def go(vsr: ValueSortedReduce[K, V1, V2]): TypedPipe[(K, V2)] = { + val ValueSortedReduce(ord, p, v, redfn, r, d) = vsr + ReduceStepPipe(ValueSortedReduce[K, V1, V2](ord, FilterKeys(p, fn), v, redfn, r, d)) + } + go(vsr) + case imr@IteratorMappedReduce(_, _, _, _, _) => + def go(imr: IteratorMappedReduce[K, V1, V2]): TypedPipe[(K, V2)] = { + val IteratorMappedReduce(ord, p, redfn, r, d) = imr + ReduceStepPipe(IteratorMappedReduce[K, V1, V2](ord, FilterKeys(p, fn), redfn, r, d)) + } + go(imr) + } + + private def mapGroup[K, V1, V2, V3](rs: ReduceStep[K, V1, V2], fn: (K, Iterator[V2]) => Iterator[V3]): TypedPipe[(K, V3)] = + rs match { + case step@IdentityReduce(_, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + case step@UnsortedIdentityReduce(_, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + case step@IteratorMappedReduce(_, _, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + } + + private def handleHashCoGroup[K, V, V2, R](hj: HashCoGroup[K, V, V2, R], recurse: FunctionK[TypedPipe, LitPipe]): LitPipe[(K, R)] = { + val rightLit: LitPipe[(K, V2)] = hj.right match { + case step@IdentityReduce(_, _, _, _) => + Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) }) + case step@UnsortedIdentityReduce(_, _, _, _) => + Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) }) + case step@IteratorMappedReduce(_, _, _, _, _) => + def go[A, B, C](imr: IteratorMappedReduce[A, B, C]): LitPipe[(A, C)] = + Unary(recurse(imr.mapped), { (tp: TypedPipe[(A, B)]) => ReduceStepPipe[A, B, C](imr.copy(mapped = tp)) }) + + widen(go(step)) + } + + val ordK: Ordering[K] = hj.right match { + case step@IdentityReduce(_, _, _, _) => step.keyOrdering + case step@UnsortedIdentityReduce(_, _, _, _) => step.keyOrdering + case step@IteratorMappedReduce(_, _, _, _, _) => step.keyOrdering + } + + val joiner = hj.joiner + + Binary(recurse(hj.left), rightLit, + { (ltp: TypedPipe[(K, V)], rtp: TypedPipe[(K, V2)]) => + rtp match { + case ReduceStepPipe(hg: HashJoinable[K, V2]) => + HashCoGroup(ltp, hg, joiner) + case otherwise => + HashCoGroup(ltp, IdentityReduce(ordK, otherwise, None, Nil), joiner) + } + }) + } +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index 1b5a8641d8..93e7052610 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -149,20 +149,20 @@ object TypedPipe extends Serializable { case ComputedValue(pipe) => CrossPipe(left, pipe) } } - case class DebugPipe[T](pipe: TypedPipe[T]) extends TypedPipe[T] + case class DebugPipe[T](input: TypedPipe[T]) extends TypedPipe[T] case class FilterKeys[K, V](input: TypedPipe[(K, V)], fn: K => Boolean) extends TypedPipe[(K, V)] case class Filter[T](input: TypedPipe[T], fn: T => Boolean) extends TypedPipe[T] case class Fork[T](input: TypedPipe[T]) extends TypedPipe[T] case class FlatMapValues[K, V, U](input: TypedPipe[(K, V)], fn: V => TraversableOnce[U]) extends TypedPipe[(K, U)] case class FlatMapped[T, U](input: TypedPipe[T], fn: T => TraversableOnce[U]) extends TypedPipe[U] - case class ForceToDisk[T](pipe: TypedPipe[T]) extends TypedPipe[T] + case class ForceToDisk[T](input: TypedPipe[T]) extends TypedPipe[T] case class IterablePipe[T](iterable: Iterable[T]) extends TypedPipe[T] case class MapValues[K, V, U](input: TypedPipe[(K, V)], fn: V => U) extends TypedPipe[(K, U)] case class Mapped[T, U](input: TypedPipe[T], fn: T => U) extends TypedPipe[U] case class MergedTypedPipe[T](left: TypedPipe[T], right: TypedPipe[T]) extends TypedPipe[T] case class SourcePipe[T](source: TypedSource[T]) extends TypedPipe[T] case class SumByLocalKeys[K, V](input: TypedPipe[(K, V)], semigroup: Semigroup[V]) extends TypedPipe[(K, V)] - case class TrappedPipe[T, U >: T](input: TypedPipe[T], sink: Source with TypedSink[T], conv: TupleConverter[U]) extends TypedPipe[U] + case class TrappedPipe[T](input: TypedPipe[T], sink: Source with TypedSink[T], conv: TupleConverter[T]) extends TypedPipe[T] case class WithDescriptionTypedPipe[T](input: TypedPipe[T], description: String, deduplicate: Boolean) extends TypedPipe[T] case class WithOnComplete[T](input: TypedPipe[T], fn: () => Unit) extends TypedPipe[T] case object EmptyTypedPipe extends TypedPipe[Nothing] @@ -172,6 +172,15 @@ object TypedPipe extends Serializable { case class CoGroupedPipe[K, V](cogrouped: CoGrouped[K, V]) extends TypedPipe[(K, V)] case class ReduceStepPipe[K, V1, V2](reduce: ReduceStep[K, V1, V2]) extends TypedPipe[(K, V2)] + + + implicit class InvariantTypedPipe[T](val pipe: TypedPipe[T]) extends AnyVal { + /** + * If any errors happen below this line, but before a groupBy, write to a TypedSink + */ + def addTrap(trapSink: Source with TypedSink[T])(implicit conv: TupleConverter[T]): TypedPipe[T] = + TypedPipe.TrappedPipe[T](pipe, trapSink, conv).withLine + } } /** @@ -719,12 +728,6 @@ sealed trait TypedPipe[+T] extends Serializable { serialization: K => Array[Byte], ordering: Ordering[K]): Sketched[K, V] = Sketched(ev(this), reducers, delta, eps, seed) - - /** - * If any errors happen below this line, but before a groupBy, write to a TypedSink - */ - def addTrap[U >: T](trapSink: Source with TypedSink[T])(implicit conv: TupleConverter[U]): TypedPipe[U] = - TypedPipe.TrappedPipe[T, U](this, trapSink, conv).withLine } /** diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala index a03d2afc93..c7ef963b31 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala @@ -304,14 +304,14 @@ object CascadingBackend { finish(sum(slk), rest, descriptions) case tp@TrappedPipe(_, _, _) => - def go[T0, T1 >: T0](tp: TrappedPipe[T0, T1], r: FlatMappedFn[T1, U]): Pipe = { + def go[A](tp: TrappedPipe[A], r: FlatMappedFn[A, U]): Pipe = { val cp = cacheGet(tp, mode) { implicit fd => val sfields = tp.sink.sinkFields // TODO: with diamonds in the graph, this might not be correct - val pp = toPipe[T0](tp.input, sfields)(fd, mode, tp.sink.setter) + val pp = toPipe[A](tp.input, sfields)(fd, mode, tp.sink.setter) val pipe = RichPipe.assignName(pp) flowDef.addTrap(pipe, tp.sink.createTap(Write)(mode)) - CascadingPipe[T1](pipe, sfields, fd, tp.conv) + CascadingPipe[A](pipe, sfields, fd, tp.conv) } finish(cp, r, descriptions) } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala new file mode 100644 index 0000000000..06743e6c14 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala @@ -0,0 +1,184 @@ +package com.twitter.scalding.typed + +import com.twitter.scalding.source.TypedText +import org.scalatest.FunSuite +import org.scalatest.prop.PropertyChecks +import org.scalacheck.{ Arbitrary, Gen } +import PropertyChecks.forAll + +object TypedPipeGen { + val srcGen: Gen[TypedPipe[Int]] = { + val g1 = Gen.listOf(Arbitrary.arbitrary[Int]).map(TypedPipe.from(_)) + val src = Gen.identifier.map { f => TypedPipe.from(TypedText.tsv[Int](f)) } + Gen.oneOf(g1, src, Gen.const(TypedPipe.empty)) + } + + lazy val mapped: Gen[TypedPipe[Int]] = { + val next1: Gen[TypedPipe[Int] => TypedPipe[Int]] = + Gen.oneOf( + tpGen.map { p: TypedPipe[Int] => + { x: TypedPipe[Int] => x.cross(p).keys } + }, + tpGen.map { p: TypedPipe[Int] => + { x: TypedPipe[Int] => x.cross(ValuePipe(2)).values } + }, + Gen.const( { t: TypedPipe[Int] => t.debug }), + Arbitrary.arbitrary[Int => Boolean].map { fn => + { t: TypedPipe[Int] => t.filter(fn) } + }, + Gen.const( { t: TypedPipe[Int] => t.forceToDisk }), + Gen.const( { t: TypedPipe[Int] => t.fork }), + tpGen.map { p: TypedPipe[Int] => + { x: TypedPipe[Int] => x ++ p } + }, + Gen.identifier.map { id => + { t: TypedPipe[Int] => t.addTrap(TypedText.tsv[Int](id)) } + }, + Gen.identifier.map { id => + { t: TypedPipe[Int] => t.withDescription(id) } + } + ) + + val one = for { + n <- next1 + p <- tpGen + } yield n(p) + + val next2: Gen[TypedPipe[(Int, Int)] => TypedPipe[Int]] = + Gen.oneOf( + Gen.const( { p: TypedPipe[(Int, Int)] => p.values }), + Gen.const( { p: TypedPipe[(Int, Int)] => p.keys }) + ) + + val two = for { + n <- next2 + p <- keyed + } yield n(p) + + Gen.frequency((3, one), (1, two)) + } + + lazy val keyed: Gen[TypedPipe[(Int, Int)]] = + Gen.oneOf( + for { + single <- tpGen + fn <- Arbitrary.arbitrary[Int => (Int, Int)] + } yield single.map(fn), + for { + single <- tpGen + fn <- Arbitrary.arbitrary[Int => List[(Int, Int)]] + } yield single.flatMap(fn), + for { + fn <- Arbitrary.arbitrary[Int => Boolean] + pair <- keyed + } yield pair.filterKeys(fn), + for { + fn <- Arbitrary.arbitrary[Int => List[Int]] + pair <- keyed + } yield pair.flatMapValues(fn), + for { + fn <- Arbitrary.arbitrary[Int => Int] + pair <- keyed + } yield pair.mapValues(fn), + for { + pair <- Gen.lzy(keyed) + } yield pair.sumByKey.toTypedPipe, + for { + pair <- Gen.lzy(keyed) + } yield pair.sumByLocalKeys, + for { + pair <- Gen.lzy(keyed) + } yield pair.group.mapGroup { (k, its) => its }.toTypedPipe, + for { + pair <- Gen.lzy(keyed) + } yield pair.group.sorted.mapGroup { (k, its) => its }.toTypedPipe, + for { + pair <- Gen.lzy(keyed) + } yield pair.group.sorted.withReducers(2).mapGroup { (k, its) => its }.toTypedPipe, + for { + p1 <- Gen.lzy(keyed) + p2 <- Gen.lzy(keyed) + } yield p1.hashJoin(p2).values, + for { + p1 <- Gen.lzy(keyed) + p2 <- Gen.lzy(keyed) + } yield p1.join(p2).values, + for { + p1 <- Gen.lzy(keyed) + p2 <- Gen.lzy(keyed) + } yield p1.join(p2).mapValues { case (a, b) => a * b }.toTypedPipe + ) + + val tpGen: Gen[TypedPipe[Int]] = + Gen.lzy(Gen.frequency((1, srcGen), (1, mapped))) +} + +class OptimizationRulesTest extends FunSuite { + import OptimizationRules.toLiteral + + def invert[T](t: TypedPipe[T]) = + assert(toLiteral(t).evaluate == t) + + test("randomly generated TypedPipe trees are invertible") { + forAll(TypedPipeGen.tpGen) { (t: TypedPipe[Int]) => + invert(t) + } + } + + test("OptimizationRules.toLiteral is invertible on some specific instances") { + + invert(TypedPipe.from(TypedText.tsv[Int]("foo"))) + invert(TypedPipe.from(List(1, 2, 3))) + invert(TypedPipe.from(List(1, 2, 3)).map(_ * 2)) + invert { + TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey.toTypedPipe + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + + p.mapGroup { (k, its) => Iterator.single(its.sum * k) } + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + p.cross(TypedPipe.from(List("a", "b", "c")).sum) + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + p.cross(TypedPipe.from(List("a", "b", "c"))) + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + p.forceToDisk + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + p.fork + } + + invert { + val p1 = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) } + val p2 = TypedPipe.from(TypedText.tsv[(Int, String)]("foo")) + + p1.join(p2).toTypedPipe + } + + invert { + val p1 = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) } + val p2 = TypedPipe.from(TypedText.tsv[(Int, String)]("foo")) + + p1.hashJoin(p2) + } + + invert { + val p1 = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) } + val p2 = TypedPipe.from(TypedText.tsv[(Int, String)]("foo")) + + p1.join(p2).filterKeys(_ % 2 == 0) + } + } +} From b86a42a482cd03b3d1cd78920eea124391b4ad0e Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Sat, 23 Sep 2017 08:20:18 -1000 Subject: [PATCH 2/9] reduce stack depth --- .../typed/OptimizationRulesTest.scala | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala index 06743e6c14..361394795b 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala @@ -22,12 +22,12 @@ object TypedPipeGen { tpGen.map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x.cross(ValuePipe(2)).values } }, - Gen.const( { t: TypedPipe[Int] => t.debug }), + Gen.const({ t: TypedPipe[Int] => t.debug }), Arbitrary.arbitrary[Int => Boolean].map { fn => { t: TypedPipe[Int] => t.filter(fn) } }, - Gen.const( { t: TypedPipe[Int] => t.forceToDisk }), - Gen.const( { t: TypedPipe[Int] => t.fork }), + Gen.const({ t: TypedPipe[Int] => t.forceToDisk }), + Gen.const({ t: TypedPipe[Int] => t.fork }), tpGen.map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x ++ p } }, @@ -36,8 +36,7 @@ object TypedPipeGen { }, Gen.identifier.map { id => { t: TypedPipe[Int] => t.withDescription(id) } - } - ) + }) val one = for { n <- next1 @@ -46,20 +45,19 @@ object TypedPipeGen { val next2: Gen[TypedPipe[(Int, Int)] => TypedPipe[Int]] = Gen.oneOf( - Gen.const( { p: TypedPipe[(Int, Int)] => p.values }), - Gen.const( { p: TypedPipe[(Int, Int)] => p.keys }) - ) + Gen.const({ p: TypedPipe[(Int, Int)] => p.values }), + Gen.const({ p: TypedPipe[(Int, Int)] => p.keys })) val two = for { n <- next2 p <- keyed } yield n(p) - Gen.frequency((3, one), (1, two)) + Gen.frequency((4, one), (1, two)) } - lazy val keyed: Gen[TypedPipe[(Int, Int)]] = - Gen.oneOf( + lazy val keyed: Gen[TypedPipe[(Int, Int)]] = { + val one = Gen.oneOf( for { single <- tpGen fn <- Arbitrary.arbitrary[Int => (Int, Int)] @@ -67,7 +65,9 @@ object TypedPipeGen { for { single <- tpGen fn <- Arbitrary.arbitrary[Int => List[(Int, Int)]] - } yield single.flatMap(fn), + } yield single.flatMap(fn)) + + val two = Gen.oneOf( for { fn <- Arbitrary.arbitrary[Int => Boolean] pair <- keyed @@ -106,8 +106,12 @@ object TypedPipeGen { for { p1 <- Gen.lzy(keyed) p2 <- Gen.lzy(keyed) - } yield p1.join(p2).mapValues { case (a, b) => a * b }.toTypedPipe - ) + } yield p1.join(p2).mapValues { case (a, b) => a * b }.toTypedPipe) + + // bias to consuming Int, since the we can stack overflow with the (Int, Int) + // cases + Gen.frequency((2, one), (1, two)) + } val tpGen: Gen[TypedPipe[Int]] = Gen.lzy(Gen.frequency((1, srcGen), (1, mapped))) From 6e8029ffbfc613263af33bfab105ac44e89a0bd4 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Sun, 24 Sep 2017 14:31:39 -1000 Subject: [PATCH 3/9] Add generic TypedPipe optimization rules --- .../twitter/scalding/typed/FlatMappedFn.scala | 15 +- .../scalding/typed/OptimizationRules.scala | 164 +++++++++++++++++- 2 files changed, 176 insertions(+), 3 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala index fd48e9942b..7b6f19f3f0 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala @@ -58,7 +58,7 @@ sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io /** * We interpret this composition once to minimize pattern matching when we execute */ - private[this] val toFn: A => TraversableOnce[B] = { + protected val toFn: A => TraversableOnce[B] = { import FlatMapping._ def loop[A1, B1](fn: FlatMappedFn[A1, B1]): A1 => TraversableOnce[B1] = fn match { @@ -78,7 +78,12 @@ sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io f.andThen(next) case Series(FlatM(f), rest) => val next = loop(rest) // linter:disable:UndesirableTypeInference - f.andThen(_.flatMap(next)) + val first = f match { + case s@Single(_) => s.toFn + case s@Series(_, _) => s.toFn + case otherwise => otherwise + } + first.andThen(_.flatMap(next)) } loop(this) @@ -100,6 +105,12 @@ object FlatMappedFn { case _ => None } + def apply[A, B](fn: A => TraversableOnce[B]): FlatMappedFn[A, B] = + fn match { + case fmf: FlatMappedFn[A, B] => fmf + case rawfn => Single(FlatMapping.FlatM(rawfn)) + } + def identity[T]: FlatMappedFn[T, T] = Single(FlatMapping.Identity[T, T](implicitly[T =:= T])) final case class Single[A, B](fn: FlatMapping[A, B]) extends FlatMappedFn[A, B] final case class Series[A, B, C](first: FlatMapping[A, B], next: FlatMappedFn[B, C]) extends FlatMappedFn[A, C] diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala index 1afdd5f350..a48d0ceb1f 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala @@ -334,11 +334,173 @@ object OptimizationRules { Binary(recurse(hj.left), rightLit, { (ltp: TypedPipe[(K, V)], rtp: TypedPipe[(K, V2)]) => rtp match { - case ReduceStepPipe(hg: HashJoinable[K, V2]) => + case ReduceStepPipe(hg: HashJoinable[K @unchecked, V2 @unchecked]) => HashCoGroup(ltp, hg, joiner) case otherwise => HashCoGroup(ltp, IdentityReduce(ordK, otherwise, None, Nil), joiner) } }) } + + + ///////////////////////////// + // + // Here are some actual rules for simplifying TypedPipes + // + ///////////////////////////// + + object ComposeFlatMap extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case FlatMapped(FlatMapped(in, fn0), fn1) => + FlatMapped(in, FlatMappedFn(fn1).runAfter(FlatMapping.FlatM(fn0))) + case FlatMapValues(FlatMapValues(in, fn0), fn1) => + FlatMapValues(in, FlatMappedFn(fn1).runAfter(FlatMapping.FlatM(fn0))) + } + } + + object ComposeMap extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case Mapped(Mapped(in, fn0), fn1) => + Mapped(in, ComposedMapFn(fn0, fn1)) + case MapValues(MapValues(in, fn0), fn1) => + MapValues(in, ComposedMapFn(fn0, fn1)) + } + } + + object ComposeFilter extends Rule[TypedPipe] { + def apply[T](on: Dag[TypedPipe]) = { + // scala can't type check this, so we hold its hand: + // case Filter(Filter(in, fn0), fn1) => + // Some(Filter(in, ComposedFilterFn(fn0, fn1))) + case f@Filter(_, _) => + def go[A](f: Filter[A]): Option[TypedPipe[A]] = + f.input match { + case f1: Filter[a] => + Some(Filter[a](f1.input, ComposedFilterFn(f.fn, f.fn))) + case _ => None + } + go(f) + case FilterKeys(FilterKeys(in, fn0), fn1) => + Some(FilterKeys(in, ComposedFilterFn(fn0, fn1))) + case _ => None + } + } + + object ComposeWithOnComplete extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case WithOnComplete(WithOnComplete(pipe, fn0), fn1) => + WithOnComplete(pipe, ComposedOnComplete(fn0, fn1)) + } + } + + object RemoveDuplicateForceFork extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case ForceToDisk(ForceToDisk(t)) => ForceToDisk(t) + case ForceToDisk(Fork(t)) => ForceToDisk(t) + case Fork(Fork(t)) => Fork(t) + case Fork(ForceToDisk(t)) => ForceToDisk(t) + } + } + + /** + * We ignore .group if there are is no setting of reducers + * + * This is arguably not a great idea, but scalding has always + * done it to minimize accidental map-reduce steps + */ + object IgnoreNoOpGroup extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case ReduceStepPipe(IdentityReduce(_, input, None, _)) => + input + } + } + + /** + * In map-reduce settings, Merge is almost free in two contexts: + * 1. the final write + * 2. at the point we are doing a shuffle anyway. + * + * By defering merge as long as possible, we hope to find more such + * cases + */ + object DeferMerge extends PartialRule[TypedPipe] { + def handleFilter[A]: PartialFunction[Filter[A], TypedPipe[A]] = { + case Filter(MergedTypedPipe(a, b), fn) => MergedTypedPipe(Filter(a, fn), Filter(b, fn)) + } + + def applyWhere[T](on: Dag[TypedPipe]) = { + case Mapped(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(Mapped(a, fn), Mapped(b, fn)) + case FlatMapped(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(FlatMapped(a, fn), FlatMapped(b, fn)) + case MapValues(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(MapValues(a, fn), MapValues(b, fn)) + case FlatMapValues(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(FlatMapValues(a, fn), FlatMapValues(b, fn)) + case f@Filter(_, _) if handleFilter.isDefinedAt(f) => handleFilter(f) + case FilterKeys(MergedTypedPipe(a, b), fn) => + MergedTypedPipe(FilterKeys(a, fn), FilterKeys(b, fn)) + } + } + + /** + * This is an optimization we didn't do in scalding 0.17 and earlier + * because .toTypedPipe on the group totally hid the structure from + * us + */ + object FilterKeysEarly extends Rule[TypedPipe] { + def apply[T](on: Dag[TypedPipe]) = { + case FilterKeys(ReduceStepPipe(rsp), fn) => + Some(rsp match { + case step@IdentityReduce(_, _, _, _) => + ReduceStepPipe(step.filterKeys(fn)) + case step@UnsortedIdentityReduce(_, _, _, _) => + ReduceStepPipe(step.filterKeys(fn)) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + ReduceStepPipe(step.filterKeys(fn)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + ReduceStepPipe(step.filterKeys(fn)) + case step@IteratorMappedReduce(_, _, _, _, _) => + ReduceStepPipe(step.filterKeys(fn)) + }) + case FilterKeys(CoGroupedPipe(cg), fn) => + Some(CoGroupedPipe(cg.filterKeys(fn))) + case FilterKeys(HashCoGroup(left, right, joiner), fn) => + val newRight = right match { + case step@IdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@UnsortedIdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn) + } + Some(HashCoGroup(FilterKeys(left, fn), newRight, joiner)) + case _ => None + } + } + + /** + * To keep equality for case matching and caching, we need to create internal case classes + */ + private[this] case class ComposedMapFn[A, B, C](fn0: A => B, fn1: B => C) extends Function1[A, C] { + def apply(a: A) = fn1(fn0(a)) + } + private[this] case class ComposedFilterFn[-A](fn0: A => Boolean, fn1: A => Boolean) extends Function1[A, Boolean] { + def apply(a: A) = fn0(a) && fn1(a) + } + private[this] case class ComposedOnComplete(fn0: () => Unit, fn1: () => Unit) extends Function0[Unit] { + def apply(): Unit = { + @annotation.tailrec + def loop(fn: () => Unit, stack: List[() => Unit]): Unit = + fn match { + case ComposedOnComplete(left, right) => loop(left, right :: stack) + case notComposed => + notComposed() + stack match { + case h :: tail => loop(h, tail) + case Nil => () + } + } + + loop(fn0, List(fn1)) + } + } + } From 13b0b7932f99d2568b3dc1388b06505c0fe064a2 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Sun, 24 Sep 2017 18:17:30 -1000 Subject: [PATCH 4/9] fix compilation error, add a few more rules --- .../twitter/scalding/typed/FlatMappedFn.scala | 10 ++-- .../com/twitter/scalding/typed/Joiner.scala | 6 ++- .../scalding/typed/OptimizationRules.scala | 47 ++++++++++++++++++- .../com/twitter/scalding/ExecutionTest.scala | 4 +- 4 files changed, 59 insertions(+), 8 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala index 7b6f19f3f0..bf029c4cd7 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala @@ -27,7 +27,7 @@ import cascading.tuple.TupleEntry * map * flatMap */ -sealed trait FlatMapping[-A, +B] extends java.io.Serializable +sealed abstract class FlatMapping[-A, +B] extends java.io.Serializable object FlatMapping { def filter[A](fn: A => Boolean): FlatMapping[A, A] = Filter[A, A](fn, implicitly) @@ -44,7 +44,7 @@ object FlatMapping { /** * This is a composition of one or more FlatMappings */ -sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io.Serializable { +sealed abstract class FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io.Serializable { import FlatMappedFn._ final def runAfter[Z](fn: FlatMapping[Z, A]): FlatMappedFn[Z, B] = this match { @@ -58,7 +58,7 @@ sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io /** * We interpret this composition once to minimize pattern matching when we execute */ - protected val toFn: A => TraversableOnce[B] = { + protected def toFn: A => TraversableOnce[B] = { import FlatMapping._ def loop[A1, B1](fn: FlatMappedFn[A1, B1]): A1 => TraversableOnce[B1] = fn match { @@ -89,7 +89,9 @@ sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io loop(this) } - def apply(a: A): TraversableOnce[B] = toFn(a) + // If we make toFn a val directly we can hit the val init order bug + private[this] val fnCache: A => TraversableOnce[B] = toFn + def apply(a: A): TraversableOnce[B] = fnCache(a) } object FlatMappedFn { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala index f0ee79e8db..1614511b5e 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala @@ -26,9 +26,13 @@ object Joiner extends java.io.Serializable { def hashInner2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => itu.iterator.map { (v, _) } } def hashLeft2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => asOuter(itu.iterator).map { (v, _) } } - def inner2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) => + // We only allocate one of these so we can use equality to test + private[this] val inner2Any = { (key: Any, itv: Iterator[Any], itu: Iterable[Any]) => itv.flatMap { v => itu.map { u => (v, u) } } } + def inner2[K, V, U]: (K, Iterator[V], Iterable[U]) => Iterator[(V, U)] = + inner2Any.asInstanceOf[(K, Iterator[V], Iterable[U]) => Iterator[(V, U)]] + def asOuter[U](it: Iterator[U]): Iterator[Option[U]] = { if (it.isEmpty) { Iterator(None) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala index a48d0ceb1f..c59c3e3dc0 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala @@ -464,7 +464,7 @@ object OptimizationRules { ReduceStepPipe(step.filterKeys(fn)) }) case FilterKeys(CoGroupedPipe(cg), fn) => - Some(CoGroupedPipe(cg.filterKeys(fn))) + Some(CoGroupedPipe(CoGrouped.FilterKeys(cg, fn))) case FilterKeys(HashCoGroup(left, right, joiner), fn) => val newRight = right match { case step@IdentityReduce(_, _, _, _) => step.filterKeys(fn) @@ -476,6 +476,51 @@ object OptimizationRules { } } + object EmptyIsOftenNoOp extends PartialRule[TypedPipe] { + + def emptyCogroup[K, V](cg: CoGrouped[K, V]): Boolean = { + import CoGrouped._ + + def empty(t: TypedPipe[Any]): Boolean = t match { + case EmptyTypedPipe => true + case _ => false + } + cg match { + case Pair(left, right, _) if left.inputs.forall(empty) && right.inputs.forall(empty) => true + case Pair(left, _, fn) if left.inputs.forall(empty) && (fn eq Joiner.inner2) => true + case Pair(_, right, fn) if right.inputs.forall(empty) && (fn eq Joiner.inner2) => true + case _ => false + } + } + + def applyWhere[T](on: Dag[TypedPipe]) = { + case CrossPipe(EmptyTypedPipe, _) => EmptyTypedPipe + case CrossPipe(_, EmptyTypedPipe) => EmptyTypedPipe + case CrossValue(EmptyTypedPipe, _) => EmptyTypedPipe + case DebugPipe(EmptyTypedPipe) => EmptyTypedPipe + case FilterKeys(EmptyTypedPipe, _) => EmptyTypedPipe + case Filter(EmptyTypedPipe, _) => EmptyTypedPipe + case FlatMapValues(EmptyTypedPipe, _) => EmptyTypedPipe + case FlatMapped(EmptyTypedPipe, _) => EmptyTypedPipe + case ForceToDisk(EmptyTypedPipe) => EmptyTypedPipe + case Fork(EmptyTypedPipe) => EmptyTypedPipe + case HashCoGroup(EmptyTypedPipe, _, _) => EmptyTypedPipe + case MapValues(EmptyTypedPipe, _) => EmptyTypedPipe + case Mapped(EmptyTypedPipe, _) => EmptyTypedPipe + case MergedTypedPipe(EmptyTypedPipe, a) => a + case MergedTypedPipe(a, EmptyTypedPipe) => a + case ReduceStepPipe(rs: ReduceStep[_, _, _]) if rs.mapped == EmptyTypedPipe => EmptyTypedPipe + case SumByLocalKeys(EmptyTypedPipe, _) => EmptyTypedPipe + case CoGroupedPipe(cgp) if emptyCogroup(cgp) => EmptyTypedPipe + } + } + + object EmptyIterableIsEmpty extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case IterablePipe(it) if it.isEmpty => EmptyTypedPipe + } + } + /** * To keep equality for case matching and caching, we need to create internal case classes */ diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala index de17574ae2..cd8cbd5e21 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala @@ -357,7 +357,7 @@ class ExecutionTest extends WordSpec with Matchers { val files = cleanupHook.get.asInstanceOf[TempFileCleanup].filesToCleanup assert(files.size == 1) - assert(files(0).contains(tempFile)) + assert(files.head.contains(tempFile)) cleanupHook.get.run() // Remove the hook so it doesn't show up in the list of shutdown hooks for other tests Runtime.getRuntime.removeShutdownHook(cleanupHook.get) @@ -385,7 +385,7 @@ class ExecutionTest extends WordSpec with Matchers { val files = cleanupHook.get.asInstanceOf[TempFileCleanup].filesToCleanup assert(files.size == 2) - assert(files(0).contains(tempFileOne) || files(0).contains(tempFileTwo)) + assert(files.head.contains(tempFileOne) || files.head.contains(tempFileTwo)) assert(files(1).contains(tempFileOne) || files(1).contains(tempFileTwo)) cleanupHook.get.run() // Remove the hook so it doesn't show up in the list of shutdown hooks for other tests From 4e121456674447b1ff7da2dd0c973b2b7eb62b94 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Mon, 25 Sep 2017 11:55:53 -1000 Subject: [PATCH 5/9] fix serialization issue with 2.12 --- .../twitter/scalding/typed/FlatMappedFn.scala | 18 ++++++------------ .../com/twitter/scalding/typed/Joiner.scala | 6 +++--- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala index bf029c4cd7..0fa4dec745 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/FlatMappedFn.scala @@ -27,7 +27,7 @@ import cascading.tuple.TupleEntry * map * flatMap */ -sealed abstract class FlatMapping[-A, +B] extends java.io.Serializable +sealed trait FlatMapping[-A, +B] extends java.io.Serializable object FlatMapping { def filter[A](fn: A => Boolean): FlatMapping[A, A] = Filter[A, A](fn, implicitly) @@ -44,7 +44,7 @@ object FlatMapping { /** * This is a composition of one or more FlatMappings */ -sealed abstract class FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io.Serializable { +sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io.Serializable { import FlatMappedFn._ final def runAfter[Z](fn: FlatMapping[Z, A]): FlatMappedFn[Z, B] = this match { @@ -58,7 +58,7 @@ sealed abstract class FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) wit /** * We interpret this composition once to minimize pattern matching when we execute */ - protected def toFn: A => TraversableOnce[B] = { + private[this] val toFn: A => TraversableOnce[B] = { import FlatMapping._ def loop[A1, B1](fn: FlatMappedFn[A1, B1]): A1 => TraversableOnce[B1] = fn match { @@ -78,20 +78,14 @@ sealed abstract class FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) wit f.andThen(next) case Series(FlatM(f), rest) => val next = loop(rest) // linter:disable:UndesirableTypeInference - val first = f match { - case s@Single(_) => s.toFn - case s@Series(_, _) => s.toFn - case otherwise => otherwise - } - first.andThen(_.flatMap(next)) + + { a: A1 => f(a).flatMap(next) } } loop(this) } - // If we make toFn a val directly we can hit the val init order bug - private[this] val fnCache: A => TraversableOnce[B] = toFn - def apply(a: A): TraversableOnce[B] = fnCache(a) + def apply(a: A): TraversableOnce[B] = toFn(a) } object FlatMappedFn { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala index 1614511b5e..168fd17041 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala @@ -27,9 +27,9 @@ object Joiner extends java.io.Serializable { def hashLeft2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => asOuter(itu.iterator).map { (v, _) } } // We only allocate one of these so we can use equality to test - private[this] val inner2Any = { (key: Any, itv: Iterator[Any], itu: Iterable[Any]) => - itv.flatMap { v => itu.map { u => (v, u) } } - } + private[this] val inner2Any: (Any, Iterator[Any], Iterable[Any]) => Iterator[Any] = + { (key: Any, itv: Iterator[Any], itu: Iterable[Any]) => itv.flatMap { v => itu.map { u => (v, u) } } } + def inner2[K, V, U]: (K, Iterator[V], Iterable[U]) => Iterator[(V, U)] = inner2Any.asInstanceOf[(K, Iterator[V], Iterable[U]) => Iterator[(V, U)]] From f0488b0ae8ea62ce443de58443f22a1702b66f2b Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Sat, 30 Sep 2017 15:48:10 -1000 Subject: [PATCH 6/9] Add tests of correctness to optimization rules --- .../typed/OptimizationRulesTest.scala | 99 ++++++++++++++----- 1 file changed, 73 insertions(+), 26 deletions(-) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala index 361394795b..0b794ec461 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala @@ -1,6 +1,8 @@ package com.twitter.scalding.typed +import com.stripe.dagon.{ Dag, Rule } import com.twitter.scalding.source.TypedText +import com.twitter.scalding.{ Config, Local } import org.scalatest.FunSuite import org.scalatest.prop.PropertyChecks import org.scalacheck.{ Arbitrary, Gen } @@ -13,13 +15,14 @@ object TypedPipeGen { Gen.oneOf(g1, src, Gen.const(TypedPipe.empty)) } - lazy val mapped: Gen[TypedPipe[Int]] = { + def mapped(srcGen: Gen[TypedPipe[Int]]): Gen[TypedPipe[Int]] = { + val mappedRec = Gen.lzy(mapped(srcGen)) val next1: Gen[TypedPipe[Int] => TypedPipe[Int]] = Gen.oneOf( - tpGen.map { p: TypedPipe[Int] => + tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x.cross(p).keys } }, - tpGen.map { p: TypedPipe[Int] => + tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x.cross(ValuePipe(2)).values } }, Gen.const({ t: TypedPipe[Int] => t.debug }), @@ -28,7 +31,7 @@ object TypedPipeGen { }, Gen.const({ t: TypedPipe[Int] => t.forceToDisk }), Gen.const({ t: TypedPipe[Int] => t.fork }), - tpGen.map { p: TypedPipe[Int] => + tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x ++ p } }, Gen.identifier.map { id => @@ -40,7 +43,7 @@ object TypedPipeGen { val one = for { n <- next1 - p <- tpGen + p <- tpGen(srcGen) } yield n(p) val next2: Gen[TypedPipe[(Int, Int)] => TypedPipe[Int]] = @@ -50,62 +53,63 @@ object TypedPipeGen { val two = for { n <- next2 - p <- keyed + p <- keyed(srcGen) } yield n(p) Gen.frequency((4, one), (1, two)) } - lazy val keyed: Gen[TypedPipe[(Int, Int)]] = { + def keyed(srcGen: Gen[TypedPipe[Int]]): Gen[TypedPipe[(Int, Int)]] = { + val keyRec = Gen.lzy(keyed(srcGen)) val one = Gen.oneOf( for { - single <- tpGen + single <- tpGen(srcGen) fn <- Arbitrary.arbitrary[Int => (Int, Int)] } yield single.map(fn), for { - single <- tpGen + single <- tpGen(srcGen) fn <- Arbitrary.arbitrary[Int => List[(Int, Int)]] } yield single.flatMap(fn)) val two = Gen.oneOf( for { fn <- Arbitrary.arbitrary[Int => Boolean] - pair <- keyed + pair <- keyRec } yield pair.filterKeys(fn), for { fn <- Arbitrary.arbitrary[Int => List[Int]] - pair <- keyed + pair <- keyRec } yield pair.flatMapValues(fn), for { fn <- Arbitrary.arbitrary[Int => Int] - pair <- keyed + pair <- keyRec } yield pair.mapValues(fn), for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.sumByKey.toTypedPipe, for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.sumByLocalKeys, for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.group.mapGroup { (k, its) => its }.toTypedPipe, for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.group.sorted.mapGroup { (k, its) => its }.toTypedPipe, for { - pair <- Gen.lzy(keyed) + pair <- keyRec } yield pair.group.sorted.withReducers(2).mapGroup { (k, its) => its }.toTypedPipe, for { - p1 <- Gen.lzy(keyed) - p2 <- Gen.lzy(keyed) + p1 <- keyRec + p2 <- keyRec } yield p1.hashJoin(p2).values, for { - p1 <- Gen.lzy(keyed) - p2 <- Gen.lzy(keyed) + p1 <- keyRec + p2 <- keyRec } yield p1.join(p2).values, for { - p1 <- Gen.lzy(keyed) - p2 <- Gen.lzy(keyed) + p1 <- keyRec + p2 <- keyRec } yield p1.join(p2).mapValues { case (a, b) => a * b }.toTypedPipe) // bias to consuming Int, since the we can stack overflow with the (Int, Int) @@ -113,8 +117,21 @@ object TypedPipeGen { Gen.frequency((2, one), (1, two)) } - val tpGen: Gen[TypedPipe[Int]] = - Gen.lzy(Gen.frequency((1, srcGen), (1, mapped))) + def tpGen(srcGen: Gen[TypedPipe[Int]]): Gen[TypedPipe[Int]] = + Gen.lzy(Gen.frequency((1, srcGen), (1, mapped(srcGen)))) + + /** + * This generates a TypedPipe that can't neccesarily + * be run because it has fake sources + */ + val genWithFakeSources: Gen[TypedPipe[Int]] = tpGen(srcGen) + + /** + * This can always be run because all the sources are + * Iterable sources + */ + val genWithIterableSources: Gen[TypedPipe[Int]] = + tpGen(Gen.listOf(Arbitrary.arbitrary[Int]).map(TypedPipe.from(_))) } class OptimizationRulesTest extends FunSuite { @@ -124,11 +141,41 @@ class OptimizationRulesTest extends FunSuite { assert(toLiteral(t).evaluate == t) test("randomly generated TypedPipe trees are invertible") { - forAll(TypedPipeGen.tpGen) { (t: TypedPipe[Int]) => + forAll(TypedPipeGen.genWithFakeSources) { (t: TypedPipe[Int]) => invert(t) } } + def optimizationLaw[T: Ordering](t: TypedPipe[T], rule: Rule[TypedPipe]) = { + val optimized = Dag.applyRule(t, toLiteral, rule) + + assert(TypedPipeDiff.diff(t, optimized) + .toIterableExecution + .waitFor(Config.empty, Local(true)).get.isEmpty) + } + + test("all optimization rules don't change results") { + import OptimizationRules._ + + val allRules = List(ComposeFlatMap, + ComposeMap, + ComposeFilter, + ComposeWithOnComplete, + RemoveDuplicateForceFork, + IgnoreNoOpGroup, + DeferMerge, + FilterKeysEarly, + EmptyIsOftenNoOp, + EmptyIterableIsEmpty) + + val genRule = for { + c <- Gen.choose(1, allRules.size) + rs <- Gen.pick(c, allRules) + } yield rs.reduce(_.orElse(_)) + + forAll(TypedPipeGen.genWithIterableSources, genRule)(optimizationLaw[Int] _) + } + test("OptimizationRules.toLiteral is invertible on some specific instances") { invert(TypedPipe.from(TypedText.tsv[Int]("foo"))) From 70f5bca88ce61c875e19887ad9eca0ce799adecb Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Sun, 1 Oct 2017 09:15:04 -1000 Subject: [PATCH 7/9] add comments, improve some rules --- .../com/twitter/scalding/typed/Joiner.scala | 171 +++++++++++++++--- .../scalding/typed/OptimizationRules.scala | 110 +++++++++-- 2 files changed, 239 insertions(+), 42 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala index 168fd17041..6b8052d08b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala @@ -18,40 +18,165 @@ package com.twitter.scalding.typed import com.twitter.scalding._ object Joiner extends java.io.Serializable { - def toCogroupJoiner2[K, V, U, R](hashJoiner: (K, V, Iterable[U]) => Iterator[R]): (K, Iterator[V], Iterable[U]) => Iterator[R] = { - (k: K, itv: Iterator[V], itu: Iterable[U]) => - itv.flatMap { hashJoiner(k, _, itu) } - } - def hashInner2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => itu.iterator.map { (v, _) } } - def hashLeft2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => asOuter(itu.iterator).map { (v, _) } } + type JoinFn[K, V, U, R] = (K, Iterator[V], Iterable[U]) => Iterator[R] + type HashJoinFn[K, V, U, R] = (K, V, Iterable[U]) => Iterator[R] + + def toCogroupJoiner2[K, V, U, R](hashJoiner: (K, V, Iterable[U]) => Iterator[R]): JoinFn[K, V, U, R] = + JoinFromHashJoin(hashJoiner) - // We only allocate one of these so we can use equality to test - private[this] val inner2Any: (Any, Iterator[Any], Iterable[Any]) => Iterator[Any] = - { (key: Any, itv: Iterator[Any], itu: Iterable[Any]) => itv.flatMap { v => itu.map { u => (v, u) } } } + def hashInner2[K, V, U]: HashJoinFn[K, V, U, (V, U)] = + HashInner() - def inner2[K, V, U]: (K, Iterator[V], Iterable[U]) => Iterator[(V, U)] = - inner2Any.asInstanceOf[(K, Iterator[V], Iterable[U]) => Iterator[(V, U)]] + def hashLeft2[K, V, U]: HashJoinFn[K, V, U, (V, Option[U])] = + HashLeft() - def asOuter[U](it: Iterator[U]): Iterator[Option[U]] = { + def inner2[K, V, U]: JoinFn[K, V, U, (V, U)] = + InnerJoin() + + def asOuter[U](it: Iterator[U]): Iterator[Option[U]] = if (it.isEmpty) { - Iterator(None) + Iterator.single(None) } else { it.map { Some(_) } } + + def outer2[K, V, U]: JoinFn[K, V, U, (Option[V], Option[U])] = + OuterJoin() + + def left2[K, V, U]: JoinFn[K, V, U, (V, Option[U])] = + LeftJoin() + + def right2[K, V, U]: JoinFn[K, V, U, (Option[V], U)] = + RightJoin() + + /** + * Optimizers want to match on the kinds of joins we are doing. + * This gives them that ability + */ + sealed abstract class HashJoinFunction[K, V, U, R] extends Function3[K, V, Iterable[U], Iterator[R]] + + final case class HashInner[K, V, U]() extends HashJoinFunction[K, V, U, (V, U)] { + def apply(k: K, v: V, u: Iterable[U]) = u.iterator.map((v, _)) } - def outer2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) => - if (itv.isEmpty && itu.isEmpty) { - Iterator.empty - } else { - asOuter(itv).flatMap { v => asOuter(itu.iterator).map { u => (v, u) } } - } + final case class HashLeft[K, V, U]() extends HashJoinFunction[K, V, U, (V, Option[U])] { + def apply(k: K, v: V, u: Iterable[U]) = asOuter(u.iterator).map((v, _)) + } + final case class FilteredHashJoin[K, V1, V2, R](jf: HashJoinFunction[K, V1, V2, R], fn: ((K, R)) => Boolean) extends HashJoinFunction[K, V1, V2, R] { + def apply(k: K, left: V1, right: Iterable[V2]) = + jf.apply(k, left, right).filter { r => fn((k, r)) } + } + final case class MappedHashJoin[K, V1, V2, R, R1](jf: HashJoinFunction[K, V1, V2, R], fn: R => R1) extends HashJoinFunction[K, V1, V2, R1] { + def apply(k: K, left: V1, right: Iterable[V2]) = + jf.apply(k, left, right).map(fn) } - def left2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) => - itv.flatMap { v => asOuter(itu.iterator).map { u => (v, u) } } + final case class FlatMappedHashJoin[K, V1, V2, R, R1](jf: HashJoinFunction[K, V1, V2, R], fn: R => TraversableOnce[R1]) extends HashJoinFunction[K, V1, V2, R1] { + def apply(k: K, left: V1, right: Iterable[V2]) = + jf.apply(k, left, right).flatMap(fn) + } + + sealed abstract class JoinFunction[K, V1, V2, R] extends Function3[K, Iterator[V1], Iterable[V2], Iterator[R]] + + final case class InnerJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (V1, V2)] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(V1, V2)] = + left.flatMap { v1 => right.iterator.map((v1, _)) } } - def right2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) => - asOuter(itv).flatMap { v => itu.map { u => (v, u) } } + final case class LeftJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (V1, Option[V2])] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(V1, Option[V2])] = + left.flatMap { v1 => asOuter(right.iterator).map((v1, _)) } } + final case class RightJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (Option[V1], V2)] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(Option[V1], V2)] = + asOuter(left).flatMap { v1 => right.iterator.map((v1, _)) } + } + final case class OuterJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (Option[V1], Option[V2])] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(Option[V1], Option[V2])] = + asOuter(left).flatMap { v1 => asOuter(right.iterator).map((v1, _)) } + } + final case class FilteredJoin[K, V1, V2, R](jf: JoinFunction[K, V1, V2, R], fn: ((K, R)) => Boolean) extends JoinFunction[K, V1, V2, R] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = + jf.apply(k, left, right).filter { r => fn((k, r)) } + } + final case class MappedJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: R => R1) extends JoinFunction[K, V1, V2, R1] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = + jf.apply(k, left, right).map(fn) + } + final case class FlatMappedJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: R => TraversableOnce[R1]) extends JoinFunction[K, V1, V2, R1] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = + jf.apply(k, left, right).flatMap(fn) + } + final case class MappedGroupJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: (K, Iterator[R]) => Iterator[R1]) extends JoinFunction[K, V1, V2, R1] { + def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = { + val iterr = jf.apply(k, left, right) + if (iterr.isEmpty) Iterator.empty // mapGroup operates on non-empty groups + else fn(k, iterr) + } + } + final case class JoinFromHashJoin[K, V1, V2, R](hj: (K, V1, Iterable[V2]) => Iterator[R]) extends JoinFunction[K, V1, V2, R] { + def apply(k: K, itv: Iterator[V1], itu: Iterable[V2]) = + itv.flatMap(hj(k, _, itu)) + } + + /** + * an inner-like join function is empty definitely if either side is empty + */ + final def isInnerJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] = + jf match { + case InnerJoin() => Some(true) + case LeftJoin() => Some(false) + case RightJoin() => Some(false) + case OuterJoin() => Some(false) + case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj) + case FilteredJoin(jf, _) => isInnerJoinLike(jf) + case MappedJoin(jf, _) => isInnerJoinLike(jf) + case FlatMappedJoin(jf, _) => isInnerJoinLike(jf) + case MappedGroupJoin(jf, _) => isInnerJoinLike(jf) + case _ => None + } + /** + * a left-like join function is empty definitely if the left side is empty + */ + final def isLeftJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] = + jf match { + case InnerJoin() => Some(true) + case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj) + case LeftJoin() => Some(true) + case RightJoin() => Some(false) + case OuterJoin() => Some(false) + case FilteredJoin(jf, _) => isLeftJoinLike(jf) + case MappedJoin(jf, _) => isLeftJoinLike(jf) + case FlatMappedJoin(jf, _) => isLeftJoinLike(jf) + case MappedGroupJoin(jf, _) => isLeftJoinLike(jf) + case _ => None + } + /** + * a right-like join function is empty definitely if the right side is empty + */ + final def isRightJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] = + jf match { + case InnerJoin() => Some(true) + case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj) + case LeftJoin() => Some(false) + case RightJoin() => Some(true) + case OuterJoin() => Some(false) + case FilteredJoin(jf, _) => isRightJoinLike(jf) + case MappedJoin(jf, _) => isRightJoinLike(jf) + case FlatMappedJoin(jf, _) => isRightJoinLike(jf) + case MappedGroupJoin(jf, _) => isRightJoinLike(jf) + case _ => None + } + + /** + * a inner-like hash-join function is empty definitely if either side is empty + */ + final def isInnerHashJoinLike[K, V1, V2, R](jf: (K, V1, Iterable[V2]) => Iterator[R]): Option[Boolean] = + jf match { + case HashInner() => Some(true) + case HashLeft() => Some(false) + case FilteredHashJoin(jf, _) => isInnerHashJoinLike(jf) + case MappedHashJoin(jf, _) => isInnerHashJoinLike(jf) + case FlatMappedHashJoin(jf, _) => isInnerHashJoinLike(jf) + case _ => None + } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala index 4cd7b58562..6e4efbb988 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala @@ -336,6 +336,9 @@ object OptimizationRules { // ///////////////////////////// + /** + * a.flatMap(f).flatMap(g) == a.flatMap { x => f(x).flatMap(g) } + */ object ComposeFlatMap extends PartialRule[TypedPipe] { def applyWhere[T](on: Dag[TypedPipe]) = { case FlatMapped(FlatMapped(in, fn0), fn1) => @@ -345,6 +348,9 @@ object OptimizationRules { } } + /** + * a.map(f).map(g) == a.map { x => f(x).map(g) } + */ object ComposeMap extends PartialRule[TypedPipe] { def applyWhere[T](on: Dag[TypedPipe]) = { case Mapped(Mapped(in, fn0), fn1) => @@ -354,6 +360,9 @@ object OptimizationRules { } } + /** + * a.filter(f).filter(g) == a.filter { x => f(x) && g(x) } + */ object ComposeFilter extends Rule[TypedPipe] { def apply[T](on: Dag[TypedPipe]) = { // scala can't type check this, so we hold its hand: @@ -373,6 +382,9 @@ object OptimizationRules { } } + /** + * a.onComplete(f).onComplete(g) == a.onComplete { () => f(); g() } + */ object ComposeWithOnComplete extends PartialRule[TypedPipe] { def applyWhere[T](on: Dag[TypedPipe]) = { case WithOnComplete(WithOnComplete(pipe, fn0), fn1) => @@ -380,6 +392,11 @@ object OptimizationRules { } } + /** + * After a forceToDisk there is no need to immediately fork. + * Calling forceToDisk twice in a row is the same as once. + * Calling fork twice in a row is the same as once. + */ object RemoveDuplicateForceFork extends PartialRule[TypedPipe] { def applyWhere[T](on: Dag[TypedPipe]) = { case ForceToDisk(ForceToDisk(t)) => ForceToDisk(t) @@ -411,7 +428,7 @@ object OptimizationRules { * cases */ object DeferMerge extends PartialRule[TypedPipe] { - def handleFilter[A]: PartialFunction[Filter[A], TypedPipe[A]] = { + private def handleFilter[A]: PartialFunction[Filter[A], TypedPipe[A]] = { case Filter(MergedTypedPipe(a, b), fn) => MergedTypedPipe(Filter(a, fn), Filter(b, fn)) } @@ -431,27 +448,52 @@ object OptimizationRules { } /** + * Push filterKeys up as early as possible. This can happen before + * a shuffle, which can be a major win. This allows you to write + * generic methods that return all the data, but if downstream someone + * only wants certain keys they don't pay to compute everything. + * * This is an optimization we didn't do in scalding 0.17 and earlier * because .toTypedPipe on the group totally hid the structure from * us */ object FilterKeysEarly extends Rule[TypedPipe] { + private def filterReduceStep[K, V1, V2](rs: ReduceStep[K, V1, V2], fn: K => Boolean): ReduceStep[K, _, _ <: V2] = + rs match { + case step@IdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@UnsortedIdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@IdentityValueSortedReduce(_, _, _, _, _) => step.filterKeys(fn) + case step@ValueSortedReduce(_, _, _, _, _, _) => step.filterKeys(fn) + case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn) + } + + private def filterCoGroupable[K, V](rs: CoGroupable[K, V], fn: K => Boolean): CoGroupable[K, V] = + rs match { + case step@IdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@UnsortedIdentityReduce(_, _, _, _) => step.filterKeys(fn) + case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn) + case cg: CoGrouped[K, V] => filterCoGroup(cg, fn) + } + + private def filterCoGroup[K, V](cg: CoGrouped[K, V], fn: K => Boolean): CoGrouped[K, V] = + cg match { + case CoGrouped.Pair(a, b, jf) => + CoGrouped.Pair(filterCoGroupable(a, fn), filterCoGroupable(b, fn), jf) + case CoGrouped.FilterKeys(cg, g) => + filterCoGroup(cg, ComposedFilterFn(g, fn)) + case CoGrouped.MapGroup(cg, g) => + CoGrouped.MapGroup(filterCoGroup(cg, fn), g) + case CoGrouped.WithDescription(cg, d) => + CoGrouped.WithDescription(filterCoGroup(cg, fn), d) + case CoGrouped.WithReducers(cg, r) => + CoGrouped.WithReducers(filterCoGroup(cg, fn), r) + } + def apply[T](on: Dag[TypedPipe]) = { case FilterKeys(ReduceStepPipe(rsp), fn) => - Some(rsp match { - case step@IdentityReduce(_, _, _, _) => - ReduceStepPipe(step.filterKeys(fn)) - case step@UnsortedIdentityReduce(_, _, _, _) => - ReduceStepPipe(step.filterKeys(fn)) - case step@IdentityValueSortedReduce(_, _, _, _, _) => - ReduceStepPipe(step.filterKeys(fn)) - case step@ValueSortedReduce(_, _, _, _, _, _) => - ReduceStepPipe(step.filterKeys(fn)) - case step@IteratorMappedReduce(_, _, _, _, _) => - ReduceStepPipe(step.filterKeys(fn)) - }) + Some(ReduceStepPipe(filterReduceStep(rsp, fn))) case FilterKeys(CoGroupedPipe(cg), fn) => - Some(CoGroupedPipe(CoGrouped.FilterKeys(cg, fn))) + Some(CoGroupedPipe(filterCoGroup(cg, fn))) case FilterKeys(HashCoGroup(left, right, joiner), fn) => val newRight = right match { case step@IdentityReduce(_, _, _, _) => step.filterKeys(fn) @@ -459,13 +501,22 @@ object OptimizationRules { case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn) } Some(HashCoGroup(FilterKeys(left, fn), newRight, joiner)) + case FilterKeys(MapValues(pipe, mapFn), filterFn) => + Some(MapValues(FilterKeys(pipe, filterFn), mapFn)) + case FilterKeys(FlatMapValues(pipe, fmFn), filterFn) => + Some(FlatMapValues(FilterKeys(pipe, filterFn), fmFn)) case _ => None } } + /** + * EmptyTypedPipe is kind of zero of most of these operations + * We go ahead and simplify as much as possible if we see + * an EmptyTypedPipe + */ object EmptyIsOftenNoOp extends PartialRule[TypedPipe] { - def emptyCogroup[K, V](cg: CoGrouped[K, V]): Boolean = { + private def emptyCogroup[K, V](cg: CoGrouped[K, V]): Boolean = { import CoGrouped._ def empty(t: TypedPipe[Any]): Boolean = t match { @@ -473,17 +524,28 @@ object OptimizationRules { case _ => false } cg match { - case Pair(left, right, _) if left.inputs.forall(empty) && right.inputs.forall(empty) => true - case Pair(left, _, fn) if left.inputs.forall(empty) && (fn eq Joiner.inner2) => true - case Pair(_, right, fn) if right.inputs.forall(empty) && (fn eq Joiner.inner2) => true - case _ => false + case Pair(left, _, jf) if left.inputs.forall(empty) && (Joiner.isLeftJoinLike(jf) == Some(true)) => true + case Pair(_, right, jf) if right.inputs.forall(empty) && (Joiner.isRightJoinLike(jf) == Some(true)) => true + case WithDescription(cg, _) => emptyCogroup(cg) + case WithReducers(cg, _) => emptyCogroup(cg) + case MapGroup(cg, _) => emptyCogroup(cg) + case FilterKeys(cg, _) => emptyCogroup(cg) } } + private def emptyHashJoinable[K, V](hj: HashJoinable[K, V]): Boolean = + hj match { + case step@IdentityReduce(_, _, _, _) => step.mapped == EmptyTypedPipe + case step@UnsortedIdentityReduce(_, _, _, _) => step.mapped == EmptyTypedPipe + case step@IteratorMappedReduce(_, _, _, _, _) => step.mapped == EmptyTypedPipe + } + def applyWhere[T](on: Dag[TypedPipe]) = { case CrossPipe(EmptyTypedPipe, _) => EmptyTypedPipe case CrossPipe(_, EmptyTypedPipe) => EmptyTypedPipe case CrossValue(EmptyTypedPipe, _) => EmptyTypedPipe + case CrossValue(_, ComputedValue(EmptyTypedPipe)) => EmptyTypedPipe + case CrossValue(_, EmptyValue) => EmptyTypedPipe case DebugPipe(EmptyTypedPipe) => EmptyTypedPipe case FilterKeys(EmptyTypedPipe, _) => EmptyTypedPipe case Filter(EmptyTypedPipe, _) => EmptyTypedPipe @@ -492,16 +554,21 @@ object OptimizationRules { case ForceToDisk(EmptyTypedPipe) => EmptyTypedPipe case Fork(EmptyTypedPipe) => EmptyTypedPipe case HashCoGroup(EmptyTypedPipe, _, _) => EmptyTypedPipe + case HashCoGroup(_, right, hjf) if emptyHashJoinable(right) && Joiner.isInnerHashJoinLike(hjf) == Some(true) => EmptyTypedPipe case MapValues(EmptyTypedPipe, _) => EmptyTypedPipe case Mapped(EmptyTypedPipe, _) => EmptyTypedPipe case MergedTypedPipe(EmptyTypedPipe, a) => a case MergedTypedPipe(a, EmptyTypedPipe) => a case ReduceStepPipe(rs: ReduceStep[_, _, _]) if rs.mapped == EmptyTypedPipe => EmptyTypedPipe case SumByLocalKeys(EmptyTypedPipe, _) => EmptyTypedPipe + case TrappedPipe(EmptyTypedPipe, _, _) => EmptyTypedPipe case CoGroupedPipe(cgp) if emptyCogroup(cgp) => EmptyTypedPipe } } + /** + * If an Iterable is empty, it is the same as EmptyTypedPipe + */ object EmptyIterableIsEmpty extends PartialRule[TypedPipe] { def applyWhere[T](on: Dag[TypedPipe]) = { case IterablePipe(it) if it.isEmpty => EmptyTypedPipe @@ -517,6 +584,11 @@ object OptimizationRules { private[this] case class ComposedFilterFn[-A](fn0: A => Boolean, fn1: A => Boolean) extends Function1[A, Boolean] { def apply(a: A) = fn0(a) && fn1(a) } + + /** + * This is only called at the end of a task, so might as well make it stack safe since a little + * extra runtime cost won't matter + */ private[this] case class ComposedOnComplete(fn0: () => Unit, fn1: () => Unit) extends Function0[Unit] { def apply(): Unit = { @annotation.tailrec From 4ae54cf2a8ae17fadbbe2158e16844df8fdc7360 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Mon, 2 Oct 2017 08:34:45 -1000 Subject: [PATCH 8/9] fix bug with outerjoin --- .../main/scala/com/twitter/scalding/typed/Joiner.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala index 6b8052d08b..588599cacb 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala @@ -35,11 +35,8 @@ object Joiner extends java.io.Serializable { InnerJoin() def asOuter[U](it: Iterator[U]): Iterator[Option[U]] = - if (it.isEmpty) { - Iterator.single(None) - } else { - it.map { Some(_) } - } + if (it.isEmpty) Iterator.single(None) + else it.map(Some(_)) def outer2[K, V, U]: JoinFn[K, V, U, (Option[V], Option[U])] = OuterJoin() @@ -91,7 +88,8 @@ object Joiner extends java.io.Serializable { } final case class OuterJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (Option[V1], Option[V2])] { def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(Option[V1], Option[V2])] = - asOuter(left).flatMap { v1 => asOuter(right.iterator).map((v1, _)) } + if (left.isEmpty && right.isEmpty) Iterator.empty + else asOuter(left).flatMap { v1 => asOuter(right.iterator).map((v1, _)) } } final case class FilteredJoin[K, V1, V2, R](jf: JoinFunction[K, V1, V2, R], fn: ((K, R)) => Boolean) extends JoinFunction[K, V1, V2, R] { def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = From 9f708450901af19707ed0343b6ee0d69d297641e Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Mon, 2 Oct 2017 10:47:48 -1000 Subject: [PATCH 9/9] fix match error --- .../scala/com/twitter/scalding/typed/OptimizationRules.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala index 6e4efbb988..e0afe81d0e 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala @@ -526,6 +526,8 @@ object OptimizationRules { cg match { case Pair(left, _, jf) if left.inputs.forall(empty) && (Joiner.isLeftJoinLike(jf) == Some(true)) => true case Pair(_, right, jf) if right.inputs.forall(empty) && (Joiner.isRightJoinLike(jf) == Some(true)) => true + case Pair(left, right, _) if left.inputs.forall(empty) && right.inputs.forall(empty) => true + case Pair(_, _, _) => false case WithDescription(cg, _) => emptyCogroup(cg) case WithReducers(cg, _) => emptyCogroup(cg) case MapGroup(cg, _) => emptyCogroup(cg)