Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic TypedPipe optimization rules #1724

Merged
merged 11 commits into from
Oct 3, 2017
173 changes: 150 additions & 23 deletions scalding-core/src/main/scala/com/twitter/scalding/typed/Joiner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,163 @@ package com.twitter.scalding.typed
import com.twitter.scalding._

object Joiner extends java.io.Serializable {
def toCogroupJoiner2[K, V, U, R](hashJoiner: (K, V, Iterable[U]) => Iterator[R]): (K, Iterator[V], Iterable[U]) => Iterator[R] = {
(k: K, itv: Iterator[V], itu: Iterable[U]) =>
itv.flatMap { hashJoiner(k, _, itu) }

type JoinFn[K, V, U, R] = (K, Iterator[V], Iterable[U]) => Iterator[R]
type HashJoinFn[K, V, U, R] = (K, V, Iterable[U]) => Iterator[R]

def toCogroupJoiner2[K, V, U, R](hashJoiner: (K, V, Iterable[U]) => Iterator[R]): JoinFn[K, V, U, R] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on this method? (motivation as much as anything -- not sure why we would normally want to turn a hash join into a JoinFn normally?)

also I think we can use the HashJoinFn[K, V, U,R] type alias as the type of the arg to this method

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do these in the follow up, if that's okay.

We never use this method in fact, but I didn't want to remove it in the interest of being a nice guy to users that may be.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep totes. and makes sense to leave or kill for the obvious reasons to do either

JoinFromHashJoin(hashJoiner)

def hashInner2[K, V, U]: HashJoinFn[K, V, U, (V, U)] =
HashInner()

def hashLeft2[K, V, U]: HashJoinFn[K, V, U, (V, Option[U])] =
HashLeft()

def inner2[K, V, U]: JoinFn[K, V, U, (V, U)] =
InnerJoin()

def asOuter[U](it: Iterator[U]): Iterator[Option[U]] =
if (it.isEmpty) Iterator.single(None)
else it.map(Some(_))

def outer2[K, V, U]: JoinFn[K, V, U, (Option[V], Option[U])] =
OuterJoin()

def left2[K, V, U]: JoinFn[K, V, U, (V, Option[U])] =
LeftJoin()

def right2[K, V, U]: JoinFn[K, V, U, (Option[V], U)] =
RightJoin()

/**
* Optimizers want to match on the kinds of joins we are doing.
* This gives them that ability
*/
sealed abstract class HashJoinFunction[K, V, U, R] extends Function3[K, V, Iterable[U], Iterator[R]]

final case class HashInner[K, V, U]() extends HashJoinFunction[K, V, U, (V, U)] {
def apply(k: K, v: V, u: Iterable[U]) = u.iterator.map((v, _))
}
final case class HashLeft[K, V, U]() extends HashJoinFunction[K, V, U, (V, Option[U])] {
def apply(k: K, v: V, u: Iterable[U]) = asOuter(u.iterator).map((v, _))
}
final case class FilteredHashJoin[K, V1, V2, R](jf: HashJoinFunction[K, V1, V2, R], fn: ((K, R)) => Boolean) extends HashJoinFunction[K, V1, V2, R] {
def apply(k: K, left: V1, right: Iterable[V2]) =
jf.apply(k, left, right).filter { r => fn((k, r)) }
}
final case class MappedHashJoin[K, V1, V2, R, R1](jf: HashJoinFunction[K, V1, V2, R], fn: R => R1) extends HashJoinFunction[K, V1, V2, R1] {
def apply(k: K, left: V1, right: Iterable[V2]) =
jf.apply(k, left, right).map(fn)
}
final case class FlatMappedHashJoin[K, V1, V2, R, R1](jf: HashJoinFunction[K, V1, V2, R], fn: R => TraversableOnce[R1]) extends HashJoinFunction[K, V1, V2, R1] {
def apply(k: K, left: V1, right: Iterable[V2]) =
jf.apply(k, left, right).flatMap(fn)
}

def hashInner2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => itu.iterator.map { (v, _) } }
def hashLeft2[K, V, U] = { (key: K, v: V, itu: Iterable[U]) => asOuter(itu.iterator).map { (v, _) } }
sealed abstract class JoinFunction[K, V1, V2, R] extends Function3[K, Iterator[V1], Iterable[V2], Iterator[R]]

def inner2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) =>
itv.flatMap { v => itu.map { u => (v, u) } }
final case class InnerJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (V1, V2)] {
def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(V1, V2)] =
left.flatMap { v1 => right.iterator.map((v1, _)) }
}
def asOuter[U](it: Iterator[U]): Iterator[Option[U]] = {
if (it.isEmpty) {
Iterator(None)
} else {
it.map { Some(_) }
}
final case class LeftJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (V1, Option[V2])] {
def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(V1, Option[V2])] =
left.flatMap { v1 => asOuter(right.iterator).map((v1, _)) }
}
def outer2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) =>
if (itv.isEmpty && itu.isEmpty) {
Iterator.empty
} else {
asOuter(itv).flatMap { v => asOuter(itu.iterator).map { u => (v, u) } }
}
final case class RightJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (Option[V1], V2)] {
def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(Option[V1], V2)] =
asOuter(left).flatMap { v1 => right.iterator.map((v1, _)) }
}
final case class OuterJoin[K, V1, V2]() extends JoinFunction[K, V1, V2, (Option[V1], Option[V2])] {
def apply(k: K, left: Iterator[V1], right: Iterable[V2]): Iterator[(Option[V1], Option[V2])] =
if (left.isEmpty && right.isEmpty) Iterator.empty
else asOuter(left).flatMap { v1 => asOuter(right.iterator).map((v1, _)) }
}
final case class FilteredJoin[K, V1, V2, R](jf: JoinFunction[K, V1, V2, R], fn: ((K, R)) => Boolean) extends JoinFunction[K, V1, V2, R] {
def apply(k: K, left: Iterator[V1], right: Iterable[V2]) =
jf.apply(k, left, right).filter { r => fn((k, r)) }
}
final case class MappedJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: R => R1) extends JoinFunction[K, V1, V2, R1] {
def apply(k: K, left: Iterator[V1], right: Iterable[V2]) =
jf.apply(k, left, right).map(fn)
}
final case class FlatMappedJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: R => TraversableOnce[R1]) extends JoinFunction[K, V1, V2, R1] {
def apply(k: K, left: Iterator[V1], right: Iterable[V2]) =
jf.apply(k, left, right).flatMap(fn)
}
def left2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) =>
itv.flatMap { v => asOuter(itu.iterator).map { u => (v, u) } }
final case class MappedGroupJoin[K, V1, V2, R, R1](jf: JoinFunction[K, V1, V2, R], fn: (K, Iterator[R]) => Iterator[R1]) extends JoinFunction[K, V1, V2, R1] {
def apply(k: K, left: Iterator[V1], right: Iterable[V2]) = {
val iterr = jf.apply(k, left, right)
if (iterr.isEmpty) Iterator.empty // mapGroup operates on non-empty groups
else fn(k, iterr)
}
}
def right2[K, V, U] = { (key: K, itv: Iterator[V], itu: Iterable[U]) =>
asOuter(itv).flatMap { v => itu.map { u => (v, u) } }
final case class JoinFromHashJoin[K, V1, V2, R](hj: (K, V1, Iterable[V2]) => Iterator[R]) extends JoinFunction[K, V1, V2, R] {
def apply(k: K, itv: Iterator[V1], itu: Iterable[V2]) =
itv.flatMap(hj(k, _, itu))
}

/**
* an inner-like join function is empty definitely if either side is empty
*/
final def isInnerJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] =
jf match {
case InnerJoin() => Some(true)
case LeftJoin() => Some(false)
case RightJoin() => Some(false)
case OuterJoin() => Some(false)
case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj)
case FilteredJoin(jf, _) => isInnerJoinLike(jf)
case MappedJoin(jf, _) => isInnerJoinLike(jf)
case FlatMappedJoin(jf, _) => isInnerJoinLike(jf)
case MappedGroupJoin(jf, _) => isInnerJoinLike(jf)
case _ => None
}
/**
* a left-like join function is empty definitely if the left side is empty
*/
final def isLeftJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] =
jf match {
case InnerJoin() => Some(true)
case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj)
case LeftJoin() => Some(true)
case RightJoin() => Some(false)
case OuterJoin() => Some(false)
case FilteredJoin(jf, _) => isLeftJoinLike(jf)
case MappedJoin(jf, _) => isLeftJoinLike(jf)
case FlatMappedJoin(jf, _) => isLeftJoinLike(jf)
case MappedGroupJoin(jf, _) => isLeftJoinLike(jf)
case _ => None
}
/**
* a right-like join function is empty definitely if the right side is empty
*/
final def isRightJoinLike[K, V1, V2, R](jf: (K, Iterator[V1], Iterable[V2]) => Iterator[R]): Option[Boolean] =
jf match {
case InnerJoin() => Some(true)
case JoinFromHashJoin(hj) => isInnerHashJoinLike(hj)
case LeftJoin() => Some(false)
case RightJoin() => Some(true)
case OuterJoin() => Some(false)
case FilteredJoin(jf, _) => isRightJoinLike(jf)
case MappedJoin(jf, _) => isRightJoinLike(jf)
case FlatMappedJoin(jf, _) => isRightJoinLike(jf)
case MappedGroupJoin(jf, _) => isRightJoinLike(jf)
case _ => None
}

/**
* a inner-like hash-join function is empty definitely if either side is empty
*/
final def isInnerHashJoinLike[K, V1, V2, R](jf: (K, V1, Iterable[V2]) => Iterator[R]): Option[Boolean] =
jf match {
case HashInner() => Some(true)
case HashLeft() => Some(false)
case FilteredHashJoin(jf, _) => isInnerHashJoinLike(jf)
case MappedHashJoin(jf, _) => isInnerHashJoinLike(jf)
case FlatMappedHashJoin(jf, _) => isInnerHashJoinLike(jf)
case _ => None
}
}

Loading