Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic TypedPipe optimization rules #1724

Merged
merged 11 commits into from
Oct 3, 2017
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ val avroVersion = "1.7.4"
val bijectionVersion = "0.9.5"
val cascadingAvroVersion = "2.1.2"
val chillVersion = "0.8.4"
val dagonVersion = "0.2.0"
val elephantbirdVersion = "4.15"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
Expand Down Expand Up @@ -316,6 +317,7 @@ lazy val scaldingCore = module("core").settings(
"cascading" % "cascading-core" % cascadingVersion,
"cascading" % "cascading-hadoop" % cascadingVersion,
"cascading" % "cascading-local" % cascadingVersion,
"com.stripe" %% "dagon-core" % dagonVersion,
"com.twitter" % "chill-hadoop" % chillVersion,
"com.twitter" % "chill-java" % chillVersion,
"com.twitter" %% "chill-bijection" % chillVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io
f.andThen(next)
case Series(FlatM(f), rest) =>
val next = loop(rest) // linter:disable:UndesirableTypeInference
f.andThen(_.flatMap(next))

{ a: A1 => f(a).flatMap(next) }
}

loop(this)
Expand All @@ -100,6 +101,12 @@ object FlatMappedFn {
case _ => None
}

def apply[A, B](fn: A => TraversableOnce[B]): FlatMappedFn[A, B] =
fn match {
case fmf: FlatMappedFn[A, B] => fmf
case rawfn => Single(FlatMapping.FlatM(rawfn))
}

def identity[T]: FlatMappedFn[T, T] = Single(FlatMapping.Identity[T, T](implicitly[T =:= T]))
final case class Single[A, B](fn: FlatMapping[A, B]) extends FlatMappedFn[A, B]
final case class Series[A, B, C](first: FlatMapping[A, B], next: FlatMappedFn[B, C]) extends FlatMappedFn[A, C]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ object Joiner extends java.io.Serializable {
def hashInner2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => itu.iterator.map { (v, _) } }
def hashLeft2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => asOuter(itu.iterator).map { (v, _) } }

def inner2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) =>
itv.flatMap { v => itu.map { u => (v, u) } }
}
// We only allocate one of these so we can use equality to test
private[this] val inner2Any: (Any, Iterator[Any], Iterable[Any]) => Iterator[Any] =
{ (key: Any, itv: Iterator[Any], itu: Iterable[Any]) => itv.flatMap { v => itu.map { u => (v, u) } } }

def inner2[K, V, U]: (K, Iterator[V], Iterable[U]) => Iterator[(V, U)] =
inner2Any.asInstanceOf[(K, Iterator[V], Iterable[U]) => Iterator[(V, U)]]

def asOuter[U](it: Iterator[U]): Iterator[Option[U]] = {
if (it.isEmpty) {
Iterator(None)
Expand Down
Loading