From 4b6c927b19c7d992a16aa136ab09bee029e1d042 Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Tue, 26 Sep 2017 15:17:27 -1000 Subject: [PATCH] Implement Dagon.toLiteral (#1718) * Implement Dagon.toLiteral * reduce stack depth * rename LitPipe to LiteralPipe * respond to review comments --- build.sbt | 2 + .../scalding/typed/OptimizationRules.scala | 332 ++++++++++++++++++ .../twitter/scalding/typed/TypedPipe.scala | 30 +- .../cascading_backend/CascadingBackend.scala | 6 +- .../typed/OptimizationRulesTest.scala | 188 ++++++++++ 5 files changed, 540 insertions(+), 18 deletions(-) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala diff --git a/build.sbt b/build.sbt index a915961e22..993aa0ff7c 100644 --- a/build.sbt +++ b/build.sbt @@ -21,6 +21,7 @@ val avroVersion = "1.7.4" val bijectionVersion = "0.9.5" val cascadingAvroVersion = "2.1.2" val chillVersion = "0.8.4" +val dagonVersion = "0.2.0" val elephantbirdVersion = "4.15" val hadoopLzoVersion = "0.4.19" val hadoopVersion = "2.5.0" @@ -316,6 +317,7 @@ lazy val scaldingCore = module("core").settings( "cascading" % "cascading-core" % cascadingVersion, "cascading" % "cascading-hadoop" % cascadingVersion, "cascading" % "cascading-local" % cascadingVersion, + "com.stripe" %% "dagon-core" % dagonVersion, "com.twitter" % "chill-hadoop" % chillVersion, "com.twitter" % "chill-java" % chillVersion, "com.twitter" %% "chill-bijection" % chillVersion, diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala new file mode 100644 index 0000000000..2856816b62 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala @@ -0,0 +1,332 @@ +package com.twitter.scalding.typed + +import com.stripe.dagon.{ FunctionK, Memoize, Rule, PartialRule, Dag, Literal } + +object OptimizationRules { + type LiteralPipe[T] = Literal[TypedPipe, T] + + import Literal.{ Unary, Binary } + import TypedPipe._ + + /** + * Since our TypedPipe is covariant, but the Literal is not + * this is actually safe in this context, but not in general + */ + def widen[T](l: LiteralPipe[_ <: T]): LiteralPipe[T] = { + // to prove this is safe, see that if you have + // LiteralPipe[_ <: T] we can call .evaluate to get + // TypedPipe[_ <: T] which due to covariance is + // TypedPipe[T], and then using toLiteral we can get + // LiteralPipe[T] + // + // that would be wasteful to apply since the final + // result is identity. + l.asInstanceOf[LiteralPipe[T]] + } + + /** + * Convert a TypedPipe[T] to a Literal[TypedPipe, T] for + * use with Dagon + */ + def toLiteral: FunctionK[TypedPipe, LiteralPipe] = + Memoize.functionK[TypedPipe, LiteralPipe]( + new Memoize.RecursiveK[TypedPipe, LiteralPipe] { + + def toFunction[A] = { + case (c: CrossPipe[a, b], f) => + Binary(f(c.left), f(c.right), CrossPipe(_: TypedPipe[a], _: TypedPipe[b])) + case (cv@CrossValue(_, _), f) => + def go[A, B](cv: CrossValue[A, B]): LiteralPipe[(A, B)] = + cv match { + case CrossValue(a, ComputedValue(v)) => + Binary(f(a), f(v), { (a: TypedPipe[A], b: TypedPipe[B]) => + CrossValue(a, ComputedValue(b)) + }) + case CrossValue(a, v) => + Unary(f(a), CrossValue(_: TypedPipe[A], v)) + } + widen(go(cv)) + case (p: DebugPipe[a], f) => + Unary(f(p.input), DebugPipe(_: TypedPipe[a])) + case (p: FilterKeys[a, b], f) => + widen(Unary(f(p.input), FilterKeys(_: TypedPipe[(a, b)], p.fn))) + case (p: Filter[a], f) => + Unary(f(p.input), Filter(_: TypedPipe[a], p.fn)) + case (p: Fork[a], f) => + Unary(f(p.input), Fork(_: TypedPipe[a])) + case (p: FlatMapValues[a, b, c], f) => + widen(Unary(f(p.input), FlatMapValues(_: TypedPipe[(a, b)], p.fn))) + case (p: FlatMapped[a, b], f) => + Unary(f(p.input), FlatMapped(_: TypedPipe[a], p.fn)) + case (p: ForceToDisk[a], f) => + Unary(f(p.input), ForceToDisk(_: TypedPipe[a])) + case (it@IterablePipe(_), _) => + Literal.Const(it) + case (p: MapValues[a, b, c], f) => + widen(Unary(f(p.input), MapValues(_: TypedPipe[(a, b)], p.fn))) + case (p: Mapped[a, b], f) => + Unary(f(p.input), Mapped(_: TypedPipe[a], p.fn)) + case (p: MergedTypedPipe[a], f) => + Binary(f(p.left), f(p.right), MergedTypedPipe(_: TypedPipe[a], _: TypedPipe[a])) + case (src@SourcePipe(_), _) => + Literal.Const(src) + case (p: SumByLocalKeys[a, b], f) => + widen(Unary(f(p.input), SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup))) + case (p: TrappedPipe[a], f) => + Unary(f(p.input), TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv)) + case (p: WithDescriptionTypedPipe[a], f) => + Unary(f(p.input), WithDescriptionTypedPipe(_: TypedPipe[a], p.description, p.deduplicate)) + case (p: WithOnComplete[a], f) => + Unary(f(p.input), WithOnComplete(_: TypedPipe[a], p.fn)) + case (EmptyTypedPipe, _) => + Literal.Const(EmptyTypedPipe) + case (hg: HashCoGroup[a, b, c, d], f) => + widen(handleHashCoGroup(hg, f)) + case (CoGroupedPipe(cg), f) => + widen(handleCoGrouped(cg, f)) + case (ReduceStepPipe(rs), f) => + widen(handleReduceStep(rs, f)) + } + }) + + private def handleReduceStep[K, V1, V2](rs: ReduceStep[K, V1, V2], recurse: FunctionK[TypedPipe, LiteralPipe]): LiteralPipe[(K, V2)] = + rs match { + case step@IdentityReduce(_, _, _, _) => + Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) }) + case step@UnsortedIdentityReduce(_, _, _, _) => + Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) }) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + def go[A, B](ivsr: IdentityValueSortedReduce[A, B]): LiteralPipe[(A, B)] = + Unary(widen[(A, B)](recurse(ivsr.mapped)), { (tp: TypedPipe[(A, B)]) => + ReduceStepPipe[A, B, B](IdentityValueSortedReduce[A, B]( + ivsr.keyOrdering, + tp, + ivsr.valueSort, + ivsr.reducers, + ivsr.descriptions)) + }) + widen[(K, V2)](go(step)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + def go[A, B, C](vsr: ValueSortedReduce[A, B, C]): LiteralPipe[(A, C)] = + Unary(recurse(vsr.mapped), { (tp: TypedPipe[(A, B)]) => + ReduceStepPipe[A, B, C](ValueSortedReduce[A, B, C]( + vsr.keyOrdering, + tp, + vsr.valueSort, + vsr.reduceFn, + vsr.reducers, + vsr.descriptions)) + }) + go(step) + case step@IteratorMappedReduce(_, _, _, _, _) => + def go[A, B, C](imr: IteratorMappedReduce[A, B, C]): LiteralPipe[(A, C)] = + Unary(recurse(imr.mapped), { (tp: TypedPipe[(A, B)]) => ReduceStepPipe[A, B, C](imr.copy(mapped = tp)) }) + + go(step) + } + + private def handleCoGrouped[K, V](cg: CoGroupable[K, V], recurse: FunctionK[TypedPipe, LiteralPipe]): LiteralPipe[(K, V)] = { + import CoGrouped._ + + def pipeToCG[V1](t: TypedPipe[(K, V1)]): CoGroupable[K, V1] = + t match { + case ReduceStepPipe(cg: CoGroupable[K @unchecked, V1 @unchecked]) => + // we are relying on the fact that we use Ordering[K] + // as a contravariant type, despite it not being defined + // that way. + cg + case CoGroupedPipe(cg) => + // we are relying on the fact that we use Ordering[K] + // as a contravariant type, despite it not being defined + // that way. + cg.asInstanceOf[CoGroupable[K, V1]] + case kvPipe => IdentityReduce(cg.keyOrdering, kvPipe, None, Nil) + } + + cg match { + case p@Pair(_, _, _) => + def go[A, B, C](pair: Pair[K, A, B, C]): LiteralPipe[(K, C)] = { + val llit = handleCoGrouped(pair.larger, recurse) + val rlit = handleCoGrouped(pair.smaller, recurse) + val fn = pair.fn + Binary(llit, rlit, { (l: TypedPipe[(K, A)], r: TypedPipe[(K, B)]) => + Pair(pipeToCG(l), pipeToCG(r), fn) + }) + } + widen(go(p)) + case wr@WithReducers(_, _) => + def go[V1 <: V](wr: WithReducers[K, V1]): LiteralPipe[(K, V)] = { + val reds = wr.reds + Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(wr.on, recurse), { (tp: TypedPipe[(K, V1)]) => + tp match { + case ReduceStepPipe(rs) => + withReducers(rs, reds) + case CoGroupedPipe(cg) => + CoGroupedPipe(WithReducers(cg, reds)) + case kvPipe => + ReduceStepPipe(IdentityReduce(cg.keyOrdering, kvPipe, None, Nil) + .withReducers(reds)) + } + }) + } + go(wr) + case wd@WithDescription(_, _) => + def go[V1 <: V](wd: WithDescription[K, V1]): LiteralPipe[(K, V)] = { + val desc = wd.description + Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(wd.on, recurse), { (tp: TypedPipe[(K, V1)]) => + tp match { + case ReduceStepPipe(rs) => + withDescription(rs, desc) + case CoGroupedPipe(cg) => + CoGroupedPipe(WithDescription(cg, desc)) + case kvPipe => + kvPipe.withDescription(desc) + } + }) + } + go(wd) + case fk@FilterKeys(_, _) => + def go[V1 <: V](fk: FilterKeys[K, V1]): LiteralPipe[(K, V)] = { + val fn = fk.fn + Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(fk.on, recurse), { (tp: TypedPipe[(K, V1)]) => + tp match { + case ReduceStepPipe(rs) => + filterKeys(rs, fn) + case CoGroupedPipe(cg) => + CoGroupedPipe(FilterKeys(cg, fn)) + case kvPipe => + kvPipe.filterKeys(fn) + } + }) + } + go(fk) + case mg@MapGroup(_, _) => + def go[V1, V2 <: V](mg: MapGroup[K, V1, V2]): LiteralPipe[(K, V)] = { + val fn = mg.fn + Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(mg.on, recurse), { (tp: TypedPipe[(K, V1)]) => + tp match { + case ReduceStepPipe(rs) => + mapGroup(rs, fn) + case CoGroupedPipe(cg) => + CoGroupedPipe(MapGroup(cg, fn)) + case kvPipe => + ReduceStepPipe( + IdentityReduce(cg.keyOrdering, kvPipe, None, Nil) + .mapGroup(fn)) + } + }) + } + go(mg) + case step@IdentityReduce(_, _, _, _) => + widen(handleReduceStep(step, recurse)) + case step@UnsortedIdentityReduce(_, _, _, _) => + widen(handleReduceStep(step, recurse)) + case step@IteratorMappedReduce(_, _, _, _, _) => + widen(handleReduceStep(step, recurse)) + } + } + + /** + * This can't really usefully be on ReduceStep since users never want to use it + * as an ADT, as the planner does. + */ + private def withReducers[K, V1, V2](rs: ReduceStep[K, V1, V2], reds: Int): TypedPipe[(K, V2)] = + rs match { + case step@IdentityReduce(_, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + case step@UnsortedIdentityReduce(_, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + case step@IteratorMappedReduce(_, _, _, _, _) => + ReduceStepPipe(step.withReducers(reds)) + } + + private def withDescription[K, V1, V2](rs: ReduceStep[K, V1, V2], descr: String): TypedPipe[(K, V2)] = + rs match { + case step@IdentityReduce(_, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + case step@UnsortedIdentityReduce(_, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + case step@IteratorMappedReduce(_, _, _, _, _) => + ReduceStepPipe(step.withDescription(descr)) + } + + private def filterKeys[K, V1, V2](rs: ReduceStep[K, V1, V2], fn: K => Boolean): TypedPipe[(K, V2)] = + rs match { + case IdentityReduce(ord, p, r, d) => + ReduceStepPipe(IdentityReduce(ord, FilterKeys(p, fn), r, d)) + case UnsortedIdentityReduce(ord, p, r, d) => + ReduceStepPipe(UnsortedIdentityReduce(ord, FilterKeys(p, fn), r, d)) + case ivsr@IdentityValueSortedReduce(_, _, _, _, _) => + def go[V](ivsr: IdentityValueSortedReduce[K, V]): TypedPipe[(K, V)] = { + val IdentityValueSortedReduce(ord, p, v, r, d) = ivsr + ReduceStepPipe(IdentityValueSortedReduce[K, V](ord, FilterKeys(p, fn), v, r, d)) + } + go(ivsr) + case vsr@ValueSortedReduce(_, _, _, _, _, _) => + def go(vsr: ValueSortedReduce[K, V1, V2]): TypedPipe[(K, V2)] = { + val ValueSortedReduce(ord, p, v, redfn, r, d) = vsr + ReduceStepPipe(ValueSortedReduce[K, V1, V2](ord, FilterKeys(p, fn), v, redfn, r, d)) + } + go(vsr) + case imr@IteratorMappedReduce(_, _, _, _, _) => + def go(imr: IteratorMappedReduce[K, V1, V2]): TypedPipe[(K, V2)] = { + val IteratorMappedReduce(ord, p, redfn, r, d) = imr + ReduceStepPipe(IteratorMappedReduce[K, V1, V2](ord, FilterKeys(p, fn), redfn, r, d)) + } + go(imr) + } + + private def mapGroup[K, V1, V2, V3](rs: ReduceStep[K, V1, V2], fn: (K, Iterator[V2]) => Iterator[V3]): TypedPipe[(K, V3)] = + rs match { + case step@IdentityReduce(_, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + case step@UnsortedIdentityReduce(_, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + case step@IdentityValueSortedReduce(_, _, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + case step@ValueSortedReduce(_, _, _, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + case step@IteratorMappedReduce(_, _, _, _, _) => + ReduceStepPipe(step.mapGroup(fn)) + } + + private def handleHashCoGroup[K, V, V2, R](hj: HashCoGroup[K, V, V2, R], recurse: FunctionK[TypedPipe, LiteralPipe]): LiteralPipe[(K, R)] = { + val rightLit: LiteralPipe[(K, V2)] = hj.right match { + case step@IdentityReduce(_, _, _, _) => + Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) }) + case step@UnsortedIdentityReduce(_, _, _, _) => + Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) }) + case step@IteratorMappedReduce(_, _, _, _, _) => + def go[A, B, C](imr: IteratorMappedReduce[A, B, C]): LiteralPipe[(A, C)] = + Unary(recurse(imr.mapped), { (tp: TypedPipe[(A, B)]) => ReduceStepPipe[A, B, C](imr.copy(mapped = tp)) }) + + widen(go(step)) + } + + val ordK: Ordering[K] = hj.right match { + case step@IdentityReduce(_, _, _, _) => step.keyOrdering + case step@UnsortedIdentityReduce(_, _, _, _) => step.keyOrdering + case step@IteratorMappedReduce(_, _, _, _, _) => step.keyOrdering + } + + val joiner = hj.joiner + + Binary(recurse(hj.left), rightLit, + { (ltp: TypedPipe[(K, V)], rtp: TypedPipe[(K, V2)]) => + rtp match { + case ReduceStepPipe(hg: HashJoinable[K @unchecked, V2 @unchecked]) => + HashCoGroup(ltp, hg, joiner) + case otherwise => + HashCoGroup(ltp, IdentityReduce(ordK, otherwise, None, Nil), joiner) + } + }) + } +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index ec20fbbd4d..f9a8ac17aa 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -137,6 +137,7 @@ object TypedPipe extends Serializable { } } + final case class CoGroupedPipe[K, V](cogrouped: CoGrouped[K, V]) extends TypedPipe[(K, V)] final case class CrossPipe[T, U](left: TypedPipe[T], right: TypedPipe[U]) extends TypedPipe[(T, U)] { def viaHashJoin: TypedPipe[(T, U)] = left.groupAll.hashJoin(right.groupAll).values @@ -149,29 +150,34 @@ object TypedPipe extends Serializable { case ComputedValue(pipe) => CrossPipe(left, pipe) } } - final case class DebugPipe[T](pipe: TypedPipe[T]) extends TypedPipe[T] + final case class DebugPipe[T](input: TypedPipe[T]) extends TypedPipe[T] final case class FilterKeys[K, V](input: TypedPipe[(K, V)], fn: K => Boolean) extends TypedPipe[(K, V)] final case class Filter[T](input: TypedPipe[T], fn: T => Boolean) extends TypedPipe[T] - final case class Fork[T](input: TypedPipe[T]) extends TypedPipe[T] final case class FlatMapValues[K, V, U](input: TypedPipe[(K, V)], fn: V => TraversableOnce[U]) extends TypedPipe[(K, U)] final case class FlatMapped[T, U](input: TypedPipe[T], fn: T => TraversableOnce[U]) extends TypedPipe[U] - final case class ForceToDisk[T](pipe: TypedPipe[T]) extends TypedPipe[T] + final case class ForceToDisk[T](input: TypedPipe[T]) extends TypedPipe[T] + final case class Fork[T](input: TypedPipe[T]) extends TypedPipe[T] + final case class HashCoGroup[K, V, W, R](left: TypedPipe[(K, V)], right: HashJoinable[K, W], joiner: (K, V, Iterable[W]) => Iterator[R]) extends TypedPipe[(K, R)] final case class IterablePipe[T](iterable: Iterable[T]) extends TypedPipe[T] final case class MapValues[K, V, U](input: TypedPipe[(K, V)], fn: V => U) extends TypedPipe[(K, U)] final case class Mapped[T, U](input: TypedPipe[T], fn: T => U) extends TypedPipe[U] final case class MergedTypedPipe[T](left: TypedPipe[T], right: TypedPipe[T]) extends TypedPipe[T] + final case class ReduceStepPipe[K, V1, V2](reduce: ReduceStep[K, V1, V2]) extends TypedPipe[(K, V2)] final case class SourcePipe[T](source: TypedSource[T]) extends TypedPipe[T] final case class SumByLocalKeys[K, V](input: TypedPipe[(K, V)], semigroup: Semigroup[V]) extends TypedPipe[(K, V)] - final case class TrappedPipe[T, U >: T](input: TypedPipe[T], sink: Source with TypedSink[T], conv: TupleConverter[U]) extends TypedPipe[U] + final case class TrappedPipe[T](input: TypedPipe[T], sink: Source with TypedSink[T], conv: TupleConverter[T]) extends TypedPipe[T] final case class WithDescriptionTypedPipe[T](input: TypedPipe[T], description: String, deduplicate: Boolean) extends TypedPipe[T] final case class WithOnComplete[T](input: TypedPipe[T], fn: () => Unit) extends TypedPipe[T] + case object EmptyTypedPipe extends TypedPipe[Nothing] - final case class HashCoGroup[K, V, W, R](left: TypedPipe[(K, V)], - right: HashJoinable[K, W], - joiner: (K, V, Iterable[W]) => Iterator[R]) extends TypedPipe[(K, R)] - final case class CoGroupedPipe[K, V](cogrouped: CoGrouped[K, V]) extends TypedPipe[(K, V)] - final case class ReduceStepPipe[K, V1, V2](reduce: ReduceStep[K, V1, V2]) extends TypedPipe[(K, V2)] + implicit class InvariantTypedPipe[T](val pipe: TypedPipe[T]) extends AnyVal { + /** + * If any errors happen below this line, but before a groupBy, write to a TypedSink + */ + def addTrap(trapSink: Source with TypedSink[T])(implicit conv: TupleConverter[T]): TypedPipe[T] = + TypedPipe.TrappedPipe[T](pipe, trapSink, conv).withLine + } } /** @@ -719,12 +725,6 @@ sealed trait TypedPipe[+T] extends Serializable { serialization: K => Array[Byte], ordering: Ordering[K]): Sketched[K, V] = Sketched(ev(this), reducers, delta, eps, seed) - - /** - * If any errors happen below this line, but before a groupBy, write to a TypedSink - */ - def addTrap[U >: T](trapSink: Source with TypedSink[T])(implicit conv: TupleConverter[U]): TypedPipe[U] = - TypedPipe.TrappedPipe[T, U](this, trapSink, conv).withLine } /** diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala index 6bfa72dd93..f5dc6ee339 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala @@ -304,14 +304,14 @@ object CascadingBackend { finish(sum(slk), rest, descriptions) case tp@TrappedPipe(_, _, _) => - def go[T0, T1 >: T0](tp: TrappedPipe[T0, T1], r: FlatMappedFn[T1, U]): Pipe = { + def go[A](tp: TrappedPipe[A], r: FlatMappedFn[A, U]): Pipe = { val cp = cacheGet(tp, mode) { implicit fd => val sfields = tp.sink.sinkFields // TODO: with diamonds in the graph, this might not be correct - val pp = toPipe[T0](tp.input, sfields)(fd, mode, tp.sink.setter) + val pp = toPipe[A](tp.input, sfields)(fd, mode, tp.sink.setter) val pipe = RichPipe.assignName(pp) flowDef.addTrap(pipe, tp.sink.createTap(Write)(mode)) - CascadingPipe[T1](pipe, sfields, fd, tp.conv) + CascadingPipe[A](pipe, sfields, fd, tp.conv) } finish(cp, r, descriptions) } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala new file mode 100644 index 0000000000..361394795b --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala @@ -0,0 +1,188 @@ +package com.twitter.scalding.typed + +import com.twitter.scalding.source.TypedText +import org.scalatest.FunSuite +import org.scalatest.prop.PropertyChecks +import org.scalacheck.{ Arbitrary, Gen } +import PropertyChecks.forAll + +object TypedPipeGen { + val srcGen: Gen[TypedPipe[Int]] = { + val g1 = Gen.listOf(Arbitrary.arbitrary[Int]).map(TypedPipe.from(_)) + val src = Gen.identifier.map { f => TypedPipe.from(TypedText.tsv[Int](f)) } + Gen.oneOf(g1, src, Gen.const(TypedPipe.empty)) + } + + lazy val mapped: Gen[TypedPipe[Int]] = { + val next1: Gen[TypedPipe[Int] => TypedPipe[Int]] = + Gen.oneOf( + tpGen.map { p: TypedPipe[Int] => + { x: TypedPipe[Int] => x.cross(p).keys } + }, + tpGen.map { p: TypedPipe[Int] => + { x: TypedPipe[Int] => x.cross(ValuePipe(2)).values } + }, + Gen.const({ t: TypedPipe[Int] => t.debug }), + Arbitrary.arbitrary[Int => Boolean].map { fn => + { t: TypedPipe[Int] => t.filter(fn) } + }, + Gen.const({ t: TypedPipe[Int] => t.forceToDisk }), + Gen.const({ t: TypedPipe[Int] => t.fork }), + tpGen.map { p: TypedPipe[Int] => + { x: TypedPipe[Int] => x ++ p } + }, + Gen.identifier.map { id => + { t: TypedPipe[Int] => t.addTrap(TypedText.tsv[Int](id)) } + }, + Gen.identifier.map { id => + { t: TypedPipe[Int] => t.withDescription(id) } + }) + + val one = for { + n <- next1 + p <- tpGen + } yield n(p) + + val next2: Gen[TypedPipe[(Int, Int)] => TypedPipe[Int]] = + Gen.oneOf( + Gen.const({ p: TypedPipe[(Int, Int)] => p.values }), + Gen.const({ p: TypedPipe[(Int, Int)] => p.keys })) + + val two = for { + n <- next2 + p <- keyed + } yield n(p) + + Gen.frequency((4, one), (1, two)) + } + + lazy val keyed: Gen[TypedPipe[(Int, Int)]] = { + val one = Gen.oneOf( + for { + single <- tpGen + fn <- Arbitrary.arbitrary[Int => (Int, Int)] + } yield single.map(fn), + for { + single <- tpGen + fn <- Arbitrary.arbitrary[Int => List[(Int, Int)]] + } yield single.flatMap(fn)) + + val two = Gen.oneOf( + for { + fn <- Arbitrary.arbitrary[Int => Boolean] + pair <- keyed + } yield pair.filterKeys(fn), + for { + fn <- Arbitrary.arbitrary[Int => List[Int]] + pair <- keyed + } yield pair.flatMapValues(fn), + for { + fn <- Arbitrary.arbitrary[Int => Int] + pair <- keyed + } yield pair.mapValues(fn), + for { + pair <- Gen.lzy(keyed) + } yield pair.sumByKey.toTypedPipe, + for { + pair <- Gen.lzy(keyed) + } yield pair.sumByLocalKeys, + for { + pair <- Gen.lzy(keyed) + } yield pair.group.mapGroup { (k, its) => its }.toTypedPipe, + for { + pair <- Gen.lzy(keyed) + } yield pair.group.sorted.mapGroup { (k, its) => its }.toTypedPipe, + for { + pair <- Gen.lzy(keyed) + } yield pair.group.sorted.withReducers(2).mapGroup { (k, its) => its }.toTypedPipe, + for { + p1 <- Gen.lzy(keyed) + p2 <- Gen.lzy(keyed) + } yield p1.hashJoin(p2).values, + for { + p1 <- Gen.lzy(keyed) + p2 <- Gen.lzy(keyed) + } yield p1.join(p2).values, + for { + p1 <- Gen.lzy(keyed) + p2 <- Gen.lzy(keyed) + } yield p1.join(p2).mapValues { case (a, b) => a * b }.toTypedPipe) + + // bias to consuming Int, since the we can stack overflow with the (Int, Int) + // cases + Gen.frequency((2, one), (1, two)) + } + + val tpGen: Gen[TypedPipe[Int]] = + Gen.lzy(Gen.frequency((1, srcGen), (1, mapped))) +} + +class OptimizationRulesTest extends FunSuite { + import OptimizationRules.toLiteral + + def invert[T](t: TypedPipe[T]) = + assert(toLiteral(t).evaluate == t) + + test("randomly generated TypedPipe trees are invertible") { + forAll(TypedPipeGen.tpGen) { (t: TypedPipe[Int]) => + invert(t) + } + } + + test("OptimizationRules.toLiteral is invertible on some specific instances") { + + invert(TypedPipe.from(TypedText.tsv[Int]("foo"))) + invert(TypedPipe.from(List(1, 2, 3))) + invert(TypedPipe.from(List(1, 2, 3)).map(_ * 2)) + invert { + TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey.toTypedPipe + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + + p.mapGroup { (k, its) => Iterator.single(its.sum * k) } + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + p.cross(TypedPipe.from(List("a", "b", "c")).sum) + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + p.cross(TypedPipe.from(List("a", "b", "c"))) + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + p.forceToDisk + } + + invert { + val p = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) }.sumByKey + p.fork + } + + invert { + val p1 = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) } + val p2 = TypedPipe.from(TypedText.tsv[(Int, String)]("foo")) + + p1.join(p2).toTypedPipe + } + + invert { + val p1 = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) } + val p2 = TypedPipe.from(TypedText.tsv[(Int, String)]("foo")) + + p1.hashJoin(p2) + } + + invert { + val p1 = TypedPipe.from(List(1, 2, 3)).map { i => (i, i) } + val p2 = TypedPipe.from(TypedText.tsv[(Int, String)]("foo")) + + p1.join(p2).filterKeys(_ % 2 == 0) + } + } +}