Skip to content

Commit

Permalink
Implement Dagon.toLiteral (#1718)
Browse files Browse the repository at this point in the history
* Implement Dagon.toLiteral

* reduce stack depth

* rename LitPipe to LiteralPipe

* respond to review comments
  • Loading branch information
johnynek authored Sep 27, 2017
1 parent 00bd7d2 commit 4b6c927
Show file tree
Hide file tree
Showing 5 changed files with 540 additions and 18 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ val avroVersion = "1.7.4"
val bijectionVersion = "0.9.5"
val cascadingAvroVersion = "2.1.2"
val chillVersion = "0.8.4"
val dagonVersion = "0.2.0"
val elephantbirdVersion = "4.15"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
Expand Down Expand Up @@ -316,6 +317,7 @@ lazy val scaldingCore = module("core").settings(
"cascading" % "cascading-core" % cascadingVersion,
"cascading" % "cascading-hadoop" % cascadingVersion,
"cascading" % "cascading-local" % cascadingVersion,
"com.stripe" %% "dagon-core" % dagonVersion,
"com.twitter" % "chill-hadoop" % chillVersion,
"com.twitter" % "chill-java" % chillVersion,
"com.twitter" %% "chill-bijection" % chillVersion,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
package com.twitter.scalding.typed

import com.stripe.dagon.{ FunctionK, Memoize, Rule, PartialRule, Dag, Literal }

object OptimizationRules {
type LiteralPipe[T] = Literal[TypedPipe, T]

import Literal.{ Unary, Binary }
import TypedPipe._

/**
* Since our TypedPipe is covariant, but the Literal is not
* this is actually safe in this context, but not in general
*/
def widen[T](l: LiteralPipe[_ <: T]): LiteralPipe[T] = {
// to prove this is safe, see that if you have
// LiteralPipe[_ <: T] we can call .evaluate to get
// TypedPipe[_ <: T] which due to covariance is
// TypedPipe[T], and then using toLiteral we can get
// LiteralPipe[T]
//
// that would be wasteful to apply since the final
// result is identity.
l.asInstanceOf[LiteralPipe[T]]
}

/**
* Convert a TypedPipe[T] to a Literal[TypedPipe, T] for
* use with Dagon
*/
def toLiteral: FunctionK[TypedPipe, LiteralPipe] =
Memoize.functionK[TypedPipe, LiteralPipe](
new Memoize.RecursiveK[TypedPipe, LiteralPipe] {

def toFunction[A] = {
case (c: CrossPipe[a, b], f) =>
Binary(f(c.left), f(c.right), CrossPipe(_: TypedPipe[a], _: TypedPipe[b]))
case (cv@CrossValue(_, _), f) =>
def go[A, B](cv: CrossValue[A, B]): LiteralPipe[(A, B)] =
cv match {
case CrossValue(a, ComputedValue(v)) =>
Binary(f(a), f(v), { (a: TypedPipe[A], b: TypedPipe[B]) =>
CrossValue(a, ComputedValue(b))
})
case CrossValue(a, v) =>
Unary(f(a), CrossValue(_: TypedPipe[A], v))
}
widen(go(cv))
case (p: DebugPipe[a], f) =>
Unary(f(p.input), DebugPipe(_: TypedPipe[a]))
case (p: FilterKeys[a, b], f) =>
widen(Unary(f(p.input), FilterKeys(_: TypedPipe[(a, b)], p.fn)))
case (p: Filter[a], f) =>
Unary(f(p.input), Filter(_: TypedPipe[a], p.fn))
case (p: Fork[a], f) =>
Unary(f(p.input), Fork(_: TypedPipe[a]))
case (p: FlatMapValues[a, b, c], f) =>
widen(Unary(f(p.input), FlatMapValues(_: TypedPipe[(a, b)], p.fn)))
case (p: FlatMapped[a, b], f) =>
Unary(f(p.input), FlatMapped(_: TypedPipe[a], p.fn))
case (p: ForceToDisk[a], f) =>
Unary(f(p.input), ForceToDisk(_: TypedPipe[a]))
case (it@IterablePipe(_), _) =>
Literal.Const(it)
case (p: MapValues[a, b, c], f) =>
widen(Unary(f(p.input), MapValues(_: TypedPipe[(a, b)], p.fn)))
case (p: Mapped[a, b], f) =>
Unary(f(p.input), Mapped(_: TypedPipe[a], p.fn))
case (p: MergedTypedPipe[a], f) =>
Binary(f(p.left), f(p.right), MergedTypedPipe(_: TypedPipe[a], _: TypedPipe[a]))
case (src@SourcePipe(_), _) =>
Literal.Const(src)
case (p: SumByLocalKeys[a, b], f) =>
widen(Unary(f(p.input), SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup)))
case (p: TrappedPipe[a], f) =>
Unary(f(p.input), TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv))
case (p: WithDescriptionTypedPipe[a], f) =>
Unary(f(p.input), WithDescriptionTypedPipe(_: TypedPipe[a], p.description, p.deduplicate))
case (p: WithOnComplete[a], f) =>
Unary(f(p.input), WithOnComplete(_: TypedPipe[a], p.fn))
case (EmptyTypedPipe, _) =>
Literal.Const(EmptyTypedPipe)
case (hg: HashCoGroup[a, b, c, d], f) =>
widen(handleHashCoGroup(hg, f))
case (CoGroupedPipe(cg), f) =>
widen(handleCoGrouped(cg, f))
case (ReduceStepPipe(rs), f) =>
widen(handleReduceStep(rs, f))
}
})

private def handleReduceStep[K, V1, V2](rs: ReduceStep[K, V1, V2], recurse: FunctionK[TypedPipe, LiteralPipe]): LiteralPipe[(K, V2)] =
rs match {
case step@IdentityReduce(_, _, _, _) =>
Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) })
case step@UnsortedIdentityReduce(_, _, _, _) =>
Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) })
case step@IdentityValueSortedReduce(_, _, _, _, _) =>
def go[A, B](ivsr: IdentityValueSortedReduce[A, B]): LiteralPipe[(A, B)] =
Unary(widen[(A, B)](recurse(ivsr.mapped)), { (tp: TypedPipe[(A, B)]) =>
ReduceStepPipe[A, B, B](IdentityValueSortedReduce[A, B](
ivsr.keyOrdering,
tp,
ivsr.valueSort,
ivsr.reducers,
ivsr.descriptions))
})
widen[(K, V2)](go(step))
case step@ValueSortedReduce(_, _, _, _, _, _) =>
def go[A, B, C](vsr: ValueSortedReduce[A, B, C]): LiteralPipe[(A, C)] =
Unary(recurse(vsr.mapped), { (tp: TypedPipe[(A, B)]) =>
ReduceStepPipe[A, B, C](ValueSortedReduce[A, B, C](
vsr.keyOrdering,
tp,
vsr.valueSort,
vsr.reduceFn,
vsr.reducers,
vsr.descriptions))
})
go(step)
case step@IteratorMappedReduce(_, _, _, _, _) =>
def go[A, B, C](imr: IteratorMappedReduce[A, B, C]): LiteralPipe[(A, C)] =
Unary(recurse(imr.mapped), { (tp: TypedPipe[(A, B)]) => ReduceStepPipe[A, B, C](imr.copy(mapped = tp)) })

go(step)
}

private def handleCoGrouped[K, V](cg: CoGroupable[K, V], recurse: FunctionK[TypedPipe, LiteralPipe]): LiteralPipe[(K, V)] = {
import CoGrouped._

def pipeToCG[V1](t: TypedPipe[(K, V1)]): CoGroupable[K, V1] =
t match {
case ReduceStepPipe(cg: CoGroupable[K @unchecked, V1 @unchecked]) =>
// we are relying on the fact that we use Ordering[K]
// as a contravariant type, despite it not being defined
// that way.
cg
case CoGroupedPipe(cg) =>
// we are relying on the fact that we use Ordering[K]
// as a contravariant type, despite it not being defined
// that way.
cg.asInstanceOf[CoGroupable[K, V1]]
case kvPipe => IdentityReduce(cg.keyOrdering, kvPipe, None, Nil)
}

cg match {
case p@Pair(_, _, _) =>
def go[A, B, C](pair: Pair[K, A, B, C]): LiteralPipe[(K, C)] = {
val llit = handleCoGrouped(pair.larger, recurse)
val rlit = handleCoGrouped(pair.smaller, recurse)
val fn = pair.fn
Binary(llit, rlit, { (l: TypedPipe[(K, A)], r: TypedPipe[(K, B)]) =>
Pair(pipeToCG(l), pipeToCG(r), fn)
})
}
widen(go(p))
case wr@WithReducers(_, _) =>
def go[V1 <: V](wr: WithReducers[K, V1]): LiteralPipe[(K, V)] = {
val reds = wr.reds
Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(wr.on, recurse), { (tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
withReducers(rs, reds)
case CoGroupedPipe(cg) =>
CoGroupedPipe(WithReducers(cg, reds))
case kvPipe =>
ReduceStepPipe(IdentityReduce(cg.keyOrdering, kvPipe, None, Nil)
.withReducers(reds))
}
})
}
go(wr)
case wd@WithDescription(_, _) =>
def go[V1 <: V](wd: WithDescription[K, V1]): LiteralPipe[(K, V)] = {
val desc = wd.description
Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(wd.on, recurse), { (tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
withDescription(rs, desc)
case CoGroupedPipe(cg) =>
CoGroupedPipe(WithDescription(cg, desc))
case kvPipe =>
kvPipe.withDescription(desc)
}
})
}
go(wd)
case fk@FilterKeys(_, _) =>
def go[V1 <: V](fk: FilterKeys[K, V1]): LiteralPipe[(K, V)] = {
val fn = fk.fn
Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(fk.on, recurse), { (tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
filterKeys(rs, fn)
case CoGroupedPipe(cg) =>
CoGroupedPipe(FilterKeys(cg, fn))
case kvPipe =>
kvPipe.filterKeys(fn)
}
})
}
go(fk)
case mg@MapGroup(_, _) =>
def go[V1, V2 <: V](mg: MapGroup[K, V1, V2]): LiteralPipe[(K, V)] = {
val fn = mg.fn
Unary[TypedPipe, (K, V1), (K, V)](handleCoGrouped(mg.on, recurse), { (tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
mapGroup(rs, fn)
case CoGroupedPipe(cg) =>
CoGroupedPipe(MapGroup(cg, fn))
case kvPipe =>
ReduceStepPipe(
IdentityReduce(cg.keyOrdering, kvPipe, None, Nil)
.mapGroup(fn))
}
})
}
go(mg)
case step@IdentityReduce(_, _, _, _) =>
widen(handleReduceStep(step, recurse))
case step@UnsortedIdentityReduce(_, _, _, _) =>
widen(handleReduceStep(step, recurse))
case step@IteratorMappedReduce(_, _, _, _, _) =>
widen(handleReduceStep(step, recurse))
}
}

/**
* This can't really usefully be on ReduceStep since users never want to use it
* as an ADT, as the planner does.
*/
private def withReducers[K, V1, V2](rs: ReduceStep[K, V1, V2], reds: Int): TypedPipe[(K, V2)] =
rs match {
case step@IdentityReduce(_, _, _, _) =>
ReduceStepPipe(step.withReducers(reds))
case step@UnsortedIdentityReduce(_, _, _, _) =>
ReduceStepPipe(step.withReducers(reds))
case step@IdentityValueSortedReduce(_, _, _, _, _) =>
ReduceStepPipe(step.withReducers(reds))
case step@ValueSortedReduce(_, _, _, _, _, _) =>
ReduceStepPipe(step.withReducers(reds))
case step@IteratorMappedReduce(_, _, _, _, _) =>
ReduceStepPipe(step.withReducers(reds))
}

private def withDescription[K, V1, V2](rs: ReduceStep[K, V1, V2], descr: String): TypedPipe[(K, V2)] =
rs match {
case step@IdentityReduce(_, _, _, _) =>
ReduceStepPipe(step.withDescription(descr))
case step@UnsortedIdentityReduce(_, _, _, _) =>
ReduceStepPipe(step.withDescription(descr))
case step@IdentityValueSortedReduce(_, _, _, _, _) =>
ReduceStepPipe(step.withDescription(descr))
case step@ValueSortedReduce(_, _, _, _, _, _) =>
ReduceStepPipe(step.withDescription(descr))
case step@IteratorMappedReduce(_, _, _, _, _) =>
ReduceStepPipe(step.withDescription(descr))
}

private def filterKeys[K, V1, V2](rs: ReduceStep[K, V1, V2], fn: K => Boolean): TypedPipe[(K, V2)] =
rs match {
case IdentityReduce(ord, p, r, d) =>
ReduceStepPipe(IdentityReduce(ord, FilterKeys(p, fn), r, d))
case UnsortedIdentityReduce(ord, p, r, d) =>
ReduceStepPipe(UnsortedIdentityReduce(ord, FilterKeys(p, fn), r, d))
case ivsr@IdentityValueSortedReduce(_, _, _, _, _) =>
def go[V](ivsr: IdentityValueSortedReduce[K, V]): TypedPipe[(K, V)] = {
val IdentityValueSortedReduce(ord, p, v, r, d) = ivsr
ReduceStepPipe(IdentityValueSortedReduce[K, V](ord, FilterKeys(p, fn), v, r, d))
}
go(ivsr)
case vsr@ValueSortedReduce(_, _, _, _, _, _) =>
def go(vsr: ValueSortedReduce[K, V1, V2]): TypedPipe[(K, V2)] = {
val ValueSortedReduce(ord, p, v, redfn, r, d) = vsr
ReduceStepPipe(ValueSortedReduce[K, V1, V2](ord, FilterKeys(p, fn), v, redfn, r, d))
}
go(vsr)
case imr@IteratorMappedReduce(_, _, _, _, _) =>
def go(imr: IteratorMappedReduce[K, V1, V2]): TypedPipe[(K, V2)] = {
val IteratorMappedReduce(ord, p, redfn, r, d) = imr
ReduceStepPipe(IteratorMappedReduce[K, V1, V2](ord, FilterKeys(p, fn), redfn, r, d))
}
go(imr)
}

private def mapGroup[K, V1, V2, V3](rs: ReduceStep[K, V1, V2], fn: (K, Iterator[V2]) => Iterator[V3]): TypedPipe[(K, V3)] =
rs match {
case step@IdentityReduce(_, _, _, _) =>
ReduceStepPipe(step.mapGroup(fn))
case step@UnsortedIdentityReduce(_, _, _, _) =>
ReduceStepPipe(step.mapGroup(fn))
case step@IdentityValueSortedReduce(_, _, _, _, _) =>
ReduceStepPipe(step.mapGroup(fn))
case step@ValueSortedReduce(_, _, _, _, _, _) =>
ReduceStepPipe(step.mapGroup(fn))
case step@IteratorMappedReduce(_, _, _, _, _) =>
ReduceStepPipe(step.mapGroup(fn))
}

private def handleHashCoGroup[K, V, V2, R](hj: HashCoGroup[K, V, V2, R], recurse: FunctionK[TypedPipe, LiteralPipe]): LiteralPipe[(K, R)] = {
val rightLit: LiteralPipe[(K, V2)] = hj.right match {
case step@IdentityReduce(_, _, _, _) =>
Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) })
case step@UnsortedIdentityReduce(_, _, _, _) =>
Unary(widen[(K, V2)](recurse(step.mapped)), { (tp: TypedPipe[(K, V2)]) => ReduceStepPipe(step.copy(mapped = tp)) })
case step@IteratorMappedReduce(_, _, _, _, _) =>
def go[A, B, C](imr: IteratorMappedReduce[A, B, C]): LiteralPipe[(A, C)] =
Unary(recurse(imr.mapped), { (tp: TypedPipe[(A, B)]) => ReduceStepPipe[A, B, C](imr.copy(mapped = tp)) })

widen(go(step))
}

val ordK: Ordering[K] = hj.right match {
case step@IdentityReduce(_, _, _, _) => step.keyOrdering
case step@UnsortedIdentityReduce(_, _, _, _) => step.keyOrdering
case step@IteratorMappedReduce(_, _, _, _, _) => step.keyOrdering
}

val joiner = hj.joiner

Binary(recurse(hj.left), rightLit,
{ (ltp: TypedPipe[(K, V)], rtp: TypedPipe[(K, V2)]) =>
rtp match {
case ReduceStepPipe(hg: HashJoinable[K @unchecked, V2 @unchecked]) =>
HashCoGroup(ltp, hg, joiner)
case otherwise =>
HashCoGroup(ltp, IdentityReduce(ordK, otherwise, None, Nil), joiner)
}
})
}
}
Loading

0 comments on commit 4b6c927

Please sign in to comment.