Skip to content

Commit

Permalink
merge with develop (#1727)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnynek authored Sep 26, 2017
1 parent 13e230d commit 00bd7d2
Showing 1 changed file with 108 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,37 @@ import java.io.Serializable
import com.twitter.scalding.TupleConverter
import cascading.tuple.TupleEntry

/**
* This is a more powerful version of =:= that can allow
* us to remove casts and also not have any runtime cost
* for our function calls in some cases of trivial functions
*/
sealed abstract class EqTypes[A, B] extends java.io.Serializable {
def apply(a: A): B
def subst[F[_]](f: F[A]): F[B]

final def reverse: EqTypes[B, A] = {
val aa = EqTypes.reflexive[A]
type F[T] = EqTypes[T, A]
subst[F](aa)
}

def toEv: A =:= B = {
val aa = implicitly[A =:= A]
type F[T] = A =:= T
subst[F](aa)
}
}

object EqTypes extends java.io.Serializable {
private[this] final case class ReflexiveEquality[A]() extends EqTypes[A, A] {
def apply(a: A): A = a
def subst[F[_]](f: F[A]): F[A] = f
}

implicit def reflexive[A]: EqTypes[A, A] = ReflexiveEquality()
}

/**
* This is one of 4 core, non composed operations:
* identity
Expand All @@ -35,8 +66,8 @@ object FlatMapping {
def filterKeys[K, V](fn: K => Boolean): FlatMapping[(K, V), (K, V)] =
filter { kv => fn(kv._1) }

final case class Identity[A, B](ev: A =:= B) extends FlatMapping[A, B]
final case class Filter[A, B](fn: A => Boolean, ev: A =:= B) extends FlatMapping[A, B]
final case class Identity[A, B](ev: EqTypes[A, B]) extends FlatMapping[A, B]
final case class Filter[A, B](fn: A => Boolean, ev: EqTypes[A, B]) extends FlatMapping[A, B]
final case class Map[A, B](fn: A => B) extends FlatMapping[A, B]
final case class FlatM[A, B](fn: A => TraversableOnce[B]) extends FlatMapping[A, B]
}
Expand All @@ -48,13 +79,36 @@ sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io
import FlatMappedFn._

final def runAfter[Z](fn: FlatMapping[Z, A]): FlatMappedFn[Z, B] = this match {
case Single(FlatMapping.Identity(_)) => Single(fn.asInstanceOf[FlatMapping[Z, B]]) // since we have A =:= B, we know this cast is safe
case Single(FlatMapping.Identity(ev)) =>
type F[T] = FlatMapping[Z, T]
Single(ev.subst[F](fn))
case notId => fn match {
case FlatMapping.Identity(ev) => this.asInstanceOf[FlatMappedFn[Z, B]] // we have Z =:= A we know this cast is safe
case FlatMapping.Identity(ev) =>
type F[T] = FlatMappedFn[T, B]
ev.reverse.subst[F](this)
case notIdFn => Series(notIdFn, notId) // only make a Series without either side being identity
}
}

final def combine[C](next: FlatMappedFn[B, C]): FlatMappedFn[A, C] = {
/*
* We have to reassociate so the front of the series has the
* first flatmap, so we can bail out early when there are no more
* items in any flatMap result.
*/
def loop[X, Y, Z](fn0: FlatMappedFn[X, Y], fn1: FlatMappedFn[Y, Z]): FlatMappedFn[X, Z] =
fn0 match {
case Single(FlatMapping.Identity(ev)) =>
type F[T] = FlatMappedFn[T, Z]
ev.reverse.subst[F](fn1)
case Single(f0) =>
Series(f0, fn1)
case Series(f0f, f1f) =>
Series(f0f, loop(f1f, fn1))
}
loop(this, next)
}

/**
* We interpret this composition once to minimize pattern matching when we execute
*/
Expand All @@ -63,22 +117,29 @@ sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io

def loop[A1, B1](fn: FlatMappedFn[A1, B1]): A1 => TraversableOnce[B1] = fn match {
case Single(Identity(ev)) =>
{ (t: A1) => Iterator.single(t.asInstanceOf[B1]) } // A1 =:= B1
val const: A1 => TraversableOnce[A1] = FlatMappedFn.FromIdentity[A1]()
type F[T] = A1 => TraversableOnce[T]
ev.subst[F](const)
case Single(Filter(f, ev)) =>
{ (t: A1) => if (f(t)) Iterator.single(t.asInstanceOf[B1]) else Iterator.empty } // A1 =:= B1
case Single(Map(f)) => f.andThen(Iterator.single)
val filter: A1 => TraversableOnce[A1] = FlatMappedFn.FromFilter(f)
type F[T] = A1 => TraversableOnce[T]
ev.subst[F](filter)
case Single(Map(f)) => FlatMappedFn.FromMap(f)
case Single(FlatM(f)) => f
case Series(Identity(ev), rest) => loop(rest).asInstanceOf[A1 => TraversableOnce[B1]] // we know that A1 =:= C
case Series(Identity(ev), rest) =>
type F[T] = T => TraversableOnce[B1]
ev.subst[F](loop(rest))
case Series(Filter(f, ev), rest) =>
val next = loop(rest).asInstanceOf[A1 => TraversableOnce[B1]] // A1 =:= C
type F[T] = T => TraversableOnce[B1]
val next = ev.subst[F](loop(rest)) // linter:disable:UndesirableTypeInference

{ (t: A1) => if (f(t)) next(t) else Iterator.empty }
FlatMappedFn.FromFilterCompose(f, next)
case Series(Map(f), rest) =>
val next = loop(rest) // linter:disable:UndesirableTypeInference
f.andThen(next)
FlatMappedFn.FromMapCompose(f, next)
case Series(FlatM(f), rest) =>
val next = loop(rest) // linter:disable:UndesirableTypeInference
f.andThen(_.flatMap(next))
FlatMappedFn.FromFlatMapCompose(f, next)
}

loop(this)
Expand All @@ -88,19 +149,50 @@ sealed trait FlatMappedFn[-A, +B] extends (A => TraversableOnce[B]) with java.io
}

object FlatMappedFn {

/**
* we prefer case class functions since they have equality
*/
private case class FromIdentity[A]() extends Function1[A, Iterator[A]] {
def apply(a: A) = Iterator.single(a)
}
private case class FromFilter[A](fn: A => Boolean) extends Function1[A, Iterator[A]] {
def apply(a: A) = if (fn(a)) Iterator.single(a) else Iterator.empty
}
private case class FromMap[A, B](fn: A => B) extends Function1[A, Iterator[B]] {
def apply(a: A) = Iterator.single(fn(a))
}
private case class FromFilterCompose[A, B](fn: A => Boolean, next: A => TraversableOnce[B]) extends Function1[A, TraversableOnce[B]] {
def apply(a: A) = if (fn(a)) next(a) else Iterator.empty
}
private case class FromMapCompose[A, B, C](fn: A => B, next: B => TraversableOnce[C]) extends Function1[A, TraversableOnce[C]] {
def apply(a: A) = next(fn(a))
}
private case class FromFlatMapCompose[A, B, C](fn: A => TraversableOnce[B], next: B => TraversableOnce[C]) extends Function1[A, TraversableOnce[C]] {
def apply(a: A) = fn(a).flatMap(next)
}


import FlatMapping._

def asId[A, B](f: FlatMappedFn[A, B]): Option[(_ >: A) =:= (_ <: B)] = f match {
case Single(i@Identity(_)) => Some(i.ev)
def asId[A, B](f: FlatMappedFn[A, B]): Option[EqTypes[_ >: A, _ <: B]] = f match {
case Single(Identity(ev)) => Some(ev)
case _ => None
}

def asFilter[A, B](f: FlatMappedFn[A, B]): Option[(A => Boolean, (_ >: A) =:= (_ <: B))] = f match {
def asFilter[A, B](f: FlatMappedFn[A, B]): Option[(A => Boolean, EqTypes[(_ >: A), (_ <: B)])] = f match {
case Single(filter@Filter(_, _)) => Some((filter.fn, filter.ev))
case _ => None
}

def identity[T]: FlatMappedFn[T, T] = Single(FlatMapping.Identity[T, T](implicitly[T =:= T]))
def apply[A, B](fn: A => TraversableOnce[B]): FlatMappedFn[A, B] =
fn match {
case fmf: FlatMappedFn[A, B] => fmf
case rawfn => Single(FlatMapping.FlatM(rawfn))
}

def identity[T]: FlatMappedFn[T, T] = Single(FlatMapping.Identity[T, T](EqTypes.reflexive[T]))

final case class Single[A, B](fn: FlatMapping[A, B]) extends FlatMappedFn[A, B]
final case class Series[A, B, C](first: FlatMapping[A, B], next: FlatMappedFn[B, C]) extends FlatMappedFn[A, C]
}

0 comments on commit 00bd7d2

Please sign in to comment.