Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add quotation to external APIs #1763

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

fwbrasil
Copy link
Contributor

@fwbrasil fwbrasil commented Jan 8, 2018

This PR is the second part of #1754. It adds the Quoted implicit param to the user-facing APIs. After this change, the last PR will use the projection information to do automatic pushdown.

@fwbrasil fwbrasil force-pushed the quoted-apis branch 3 times, most recently from 36dec0d to d766dbe Compare January 8, 2018 21:41
@fwbrasil
Copy link
Contributor Author

fwbrasil commented Jan 8, 2018

@ttim @benpence @dieu @johnynek this is ready for review

Copy link
Collaborator

@ttim ttim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this looks good but I have a suggestion about how Quoted works in general.

I think right now the logic with passing quoted explicitly / implicitly (and Quoted.internal) is a bit fuzzy. Can we do instead something similar to cause in exceptions?

Let's make Quoted nested and basically putting quoted on method declaration means you want to pass information about this call site. And then if you have quoted parameter in the scope & apply quoted macro - put quoted from the context as .cause call site information.

I don't think we really need Quoted.internal in this case and overall logic seems more clear to me (quoted parameter - want to save information about call site).

Also Quoted become more or less call site information in this case (which maybe is a better name?).

As cons for that I can see is - you need to somehow encode how different properties (such as used field projection) propagate through cause. But I think it actually represents what we need.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is both exciting, but a bit scary. This touches virtually every public API and make everything binary incompatible.

In exchange for that, it is not yet clear what we are getting. I'd like to see maybe a quoted branch where we can use the final product with say Parquet to do some filter pushdown or projection pushdown.

If we don't have that as a killer app, I'm nervous about merging and getting to it later.

What do you think about making a quoted branch to make this PR into and when we have filter/projection pushdown working, we merge into develop?

@@ -501,7 +505,7 @@ final case class ValueSortedReduce[K, V1, V2](
* After sorting, then reducing, there is no chance
* to operate in the mappers. Just call take.
*/
override def bufferedTake(n: Int) = take(n)
override def bufferedTake(n: Int)(implicit q: Quoted) = take(n)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a Quoted here. Not clear to me. Just to get position information of the caller?


class NoStackLineNumberTest extends WordSpec {

"No Stack Shouldn't block getting line number info" should {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace this with an equivalent test that we can see the caller correctly?

@fwbrasil
Copy link
Contributor Author

This is both exciting, but a bit scary. This touches virtually every public API and make everything binary incompatible.

Yeah, it's an intrusive change. I thought it was clear that it'd be necessary when we discussed it, though.

In exchange for that, it is not yet clear what we are getting. I'd like to see maybe a quoted branch where we can use the final product with say Parquet to do some filter pushdown or projection pushdown.

If we don't have that as a killer app, I'm nervous about merging and getting to it later.

What do you think about making a quoted branch to make this PR into and when we have filter/projection pushdown working, we merge into develop?

That's #1754. These pull requests are just a break down of that initial one. Note that only projection pushdown is in the scope of this change.

@fwbrasil
Copy link
Contributor Author

I think right now the logic with passing quoted explicitly / implicitly (and Quoted.internal) is a bit fuzzy. Can we do instead something similar to cause in exceptions?

Let's make Quoted nested and basically putting quoted on method declaration means you want to pass information about this call site. And then if you have quoted parameter in the scope & apply quoted macro - put quoted from the context as .cause call site information.

@ttim I can't see the benefit of this proposal. The quotation propagation is just a way to make the initial information available in the final TypedPipe composition. Could you give more details?

I don't think we really need Quoted.internal in this case and overall logic seems more clear to me (quoted parameter - want to save information about call site).

Quoted.internal is just for safety since people might forget to propagate the quotation when making chanegs to Scalding. See the "Quoted propagation" section on #1754

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing the point, but I'd love to minimize the callsites that have to change.

To me, that could be done by only accepting Quoted where we take a lambda and only where we expect we could in principle do some filter pushdown or projection pushdown. Does that make sense?

Why else do we want the quoted?

@@ -17,7 +19,7 @@ object FlattenGroup {
}

class FlattenLeftJoin3[KEY, KLL[KLL_K, +KLL_V] <: KeyedListLike[KLL_K, KLL_V, KLL], A, B, C](nested: KLL[KEY, ((A, B), C)]) {
def flattenValueTuple: KLL[KEY, (A, B, C)] = nested.mapValues { tup => FlattenGroup.flattenNestedTuple(tup) }
def flattenValueTuple(implicit q: Quoted): KLL[KEY, (A, B, C)] = nested.mapValues { tup => FlattenGroup.flattenNestedTuple(tup) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused why we need the implicits here. Can you comment? Can't we statically see anything we need for the purposes of projection? Is this just to collect line numbers?

@@ -195,14 +199,14 @@ sealed trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped]
* but it is not clear how to generalize that for general cogrouping functions.
* For now, just do a normal take.
*/
override def bufferedTake(n: Int): CoGrouped[K, R] =
override def bufferedTake(n: Int)(implicit q: Quoted): CoGrouped[K, R] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not take an implicit here if it can't contribute to row/column filtering, which I can't see how it can in the case of take.

@@ -272,16 +276,16 @@ object Grouped {
* of each key in memory on the reducer.
*/
sealed trait Sortable[+T, +Sorted[+_]] {
def withSortOrdering[U >: T](so: Ordering[U]): Sorted[T]
def withSortOrdering[U >: T](so: Ordering[U])(implicit q: Quoted): Sorted[T]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't project/filter here right since we know sorting is 1 in 1 out.

UnsortedIdentityReduce(keyOrdering, mapped.mapValues(fn), reducers, descriptions)

override def sum[U >: V1](implicit sg: Semigroup[U]) = {
override def sum[U >: V1](implicit sg: Semigroup[U], q: Quoted) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since semigroups emit the whole object, do we really need the quoted here? Seems like we can assume there can be no projection/filtering here.

mapValueStream(Identity())

/**
* Use this to get the first value encountered.
* prefer this to take(1).
*/
def head: This[K, T] = sum(HeadSemigroup[T]())
def head(implicit q: Quoted): This[K, T] = sum(HeadSemigroup[T](), q)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't really filter here, can we? why to we need quoted?

flatMapValues(Widen(SubTypes.fromEv(ev)))

/**
* This is just short hand for mapValueStream(identity), it makes sure the
* planner sees that you want to force a shuffle. For expert tuning
*/
def forceToReducers: This[K, T] =
def forceToReducers(implicit q: Quoted): This[K, T] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we avoid quoted here?

@@ -123,9 +124,9 @@ trait KeyedListLike[K, +T, +This[K, +T] <: KeyedListLike[K, T, This]]
/**
* Use Algebird Aggregator to do the reduction
*/
def aggregate[B, C](agg: Aggregator[T, B, C]): This[K, C] =
def aggregate[B, C](agg: Aggregator[T, B, C])(implicit q: Quoted): This[K, C] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can look inside the aggregrator and filter, can we? Can we really use the quoted meaningfully?

mapValues(fn).product

/**
* For each key, selects all elements except first n ones.
*/
def drop(n: Int): This[K, T] =
def drop(n: Int)(implicit q: Quoted): This[K, T] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't filter here, can we?

mapValueStream(TakeWhile(p))

/**
* Folds are composable aggregations that make one pass over the data.
* If you need to do several custom folds over the same data, use Fold.join
* and this method
*/
def fold[V](f: Fold[T, V]): This[K, V] =
def fold[V](f: Fold[T, V])(implicit q: Quoted): This[K, V] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we look inside the Fold?

toSet[T].mapValues(SizeOfSet())

/**
* For each key, remove duplicate values. WARNING: May OOM.
* This assumes the values for each key can fit in memory.
*/
def distinctValues: This[K, T] = toSet[T].flattenValues
def distinctValues(implicit q: Quoted): This[K, T] = toSet[T].flattenValues
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't filter here, can we?

@fwbrasil
Copy link
Contributor Author

To me, that could be done by only accepting Quoted where we take a lambda and only where we expect we could in principle do some filter pushdown or projection pushdown. Does that make sense?

Why else do we want the quoted?

@johnynek take a look at the description on #1754. We'd also like to use Quoted to provide a better description of the job to users, which requires adding Quoted to all user-facing APIs.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.... It is scary, touch every API, but I guess we do it...

Have you tried to build twitter's Source against this change? i want to know what we are signing up for. Is there 100 places we need to change, or 5000?

@@ -45,7 +46,7 @@ case class Sketched[K, V](pipe: TypedPipe[(K, V)],
lazy val sketch: TypedPipe[CMS[Bytes]] = {
// every 10k items, compact into a CMS to prevent very slow mappers
lazy implicit val batchedSG: com.twitter.algebird.Semigroup[Batched[CMS[Bytes]]] = Batched.compactingSemigroup[CMS[Bytes]](10000)

implicit val q: Quoted = Quoted.internal
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we take a Quoted in the constructor actually? That should be created where we call .sketch which I think could improve reporting on internal methods.

@@ -84,6 +85,8 @@ case class SketchJoined[K: Ordering, V, V2, R](left: Sketched[K, V],

def reducers = Some(numReducers)

private implicit val q: Quoted = Quoted.internal
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment: should we take implicit q: Quoted on the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

map(WithConstant(())).group

/**
* If T <:< U, then this is safe to treat as TypedPipe[U] due to covariance
*/
protected def raiseTo[U](implicit ev: T <:< U): TypedPipe[U] =
protected def raiseTo[U](implicit ev: T <:< U, m: Quoted): TypedPipe[U] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we are using m here are we? This is really just a typesafe cast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

def debug: TypedPipe[T] =
TypedPipe.DebugPipe(this).withLine
def debug(implicit m: Quoted): TypedPipe[T] =
TypedPipe.DebugPipe(this).withQuoted

/** adds a description to the pipe */
def withDescription(description: String): TypedPipe[T] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we want Quoted here. Can you add the comment why? Is it the assumption that users get full control of the note, and there is no need to add the line/file info that Quoted can give?

Copy link
Contributor Author

@fwbrasil fwbrasil Jan 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just updated the scaladoc

@@ -721,7 +719,8 @@ sealed trait TypedPipe[+T] extends Serializable {
delta: Double = 0.01, //5 rows (= 5 hashes)
seed: Int = 12345)(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)],
serialization: K => Array[Byte],
ordering: Ordering[K]): Sketched[K, V] =
ordering: Ordering[K],
m: Quoted): Sketched[K, V] =
Sketched(ev(this), reducers, delta, eps, seed)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass m explicitly to Sketched so we can see that it is not discarded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


object CascadingBackend {
import TypedPipe._

private implicit val q: Quoted = Quoted.internal
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm nervous about this hanging out implicitly. Can we not use it implicitly and instead explicitly call it: interalQuoted where needed? I fear this will actually destroy our Quoted information when we are doing composing operations, and defeat the whole purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


@annotation.tailrec
def loop[A](t: TypedPipe[A], acc: List[(String, Boolean)]): (TypedPipe[A], List[(String, Boolean)]) =
t match {
case WithDescriptionTypedPipe(i, desc, ded) =>
case WithDescriptionTypedPipe(i, desc, ded, quoted) =>
loop(i, (desc, ded) :: acc)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to accumulate the quoted? Seems like we need a Semigroup on quoted (maybe a Monoid).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnynek desc already has the description information from quoted (see TypedPipe.withQuoted). I think we don't need to accumulate quoted at this point since the purpose is to set the description only (no projections).

@@ -301,6 +302,7 @@ class MemoryWriter(mem: MemoryMode) extends Writer {

def plan[T](m: Memo, tp: TypedPipe[T]): (Memo, Op[T]) =
m.plan(tp) {
implicit val q: Quoted = Quoted.internal
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to above, can we not keep dummy implicit Quoted around. I fear we won't know where we are actually sending them. I prefer to be explicit with the dummies (and add a comment to that effect).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@fwbrasil
Copy link
Contributor Author

@johnynek I'm still working on updating source to develop without this change. I'll test these changes after it

@CLAassistant
Copy link

CLAassistant commented Nov 16, 2019

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Flavio Brasil seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants