Skip to content

Commit

Permalink
add quotation to external APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Flavio Brasil committed Mar 26, 2018
1 parent c944a6e commit f49a434
Show file tree
Hide file tree
Showing 25 changed files with 387 additions and 406 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.twitter.scalding

import com.twitter.scalding.quotation.Quoted
import cascading.flow.FlowDef
import org.apache.avro.Schema
import collection.JavaConverters._
Expand All @@ -26,7 +27,8 @@ package object avro {
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
mode: Mode,
q: Quoted): Unit = {
val sink = PackedAvroSource[T](path)
pipe.write(sink)
}
Expand All @@ -35,7 +37,8 @@ package object avro {
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
mode: Mode,
q: Quoted): Unit = {
import Dsl._
val sink = UnpackedAvroSource[T](path, Some(schema))
val outFields = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package com.twitter.scalding.commons.extensions

import com.twitter.scalding._
import com.twitter.scalding.quotation.Quoted
import com.twitter.scalding.Dsl._

import cascading.flow.FlowDef
Expand Down Expand Up @@ -111,7 +112,7 @@ object Checkpoint {

// Wrapper for Checkpoint when using a TypedPipe
def apply[A](checkpointName: String)(flow: => TypedPipe[A])(implicit args: Args, mode: Mode, flowDef: FlowDef,
conv: TupleConverter[A], setter: TupleSetter[A]): TypedPipe[A] = {
conv: TupleConverter[A], setter: TupleSetter[A], q: Quoted): TypedPipe[A] = {
val rPipe = apply(checkpointName, Dsl.intFields(0 until conv.arity)) {
flow.toPipe(Dsl.intFields(0 until conv.arity))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.twitter.scalding.commons.tap.VersionedTap.TapMode
import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck }
import com.twitter.scalding.typed.KeyedListLike
import com.twitter.scalding.typed.TypedSink
import com.twitter.scalding.quotation.Quoted
import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -225,7 +226,7 @@ class TypedRichPipeEx[K: Ordering, V: Monoid](pipe: TypedPipe[(K, V)]) extends j
// the pipe in using an implicit `Monoid[V]` and sinks all results
// into the `sinkVersion` of data (or a new version) specified by
// `src`.
def writeIncremental(src: VersionedKeyValSource[K, V], reducers: Int = 1)(implicit flowDef: FlowDef, mode: Mode): TypedPipe[(K, V)] = {
def writeIncremental(src: VersionedKeyValSource[K, V], reducers: Int = 1)(implicit flowDef: FlowDef, mode: Mode, q: Quoted): TypedPipe[(K, V)] = {
val outPipe =
if (!src.resourceExists(mode))
pipe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.twitter.scalding.examples

import com.twitter.scalding._
import com.twitter.scalding.typed.ComputedValue
import com.twitter.scalding.quotation.Quoted

object KMeans {

Expand Down Expand Up @@ -88,12 +89,12 @@ object KMeans {
}
}

def initializeClusters(k: Int, points: TypedPipe[Vector[Double]]): (ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector]) = {
def initializeClusters(k: Int, points: TypedPipe[Vector[Double]])(implicit q: Quoted): (ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector]) = {
val rng = new java.util.Random(123)
// take a random k vectors:
val clusters = points.map { v => (rng.nextDouble, v) }
.groupAll
.sortedTake(k)(Ordering.by(_._1))
.sortedTake(k)(Ordering.by(_._1), q)
.mapValues { randk =>
randk.iterator
.zipWithIndex
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.scalding.typed

import com.twitter.algebird._
import com.twitter.scalding.quotation.Quoted

/**
* Extension for TypedPipe to add a cumulativeSum method.
Expand Down Expand Up @@ -39,7 +40,8 @@ object CumulativeSum {
def cumulativeSum(
implicit sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): SortedGrouped[K, (U, V)] = {
ordK: Ordering[K],
quoted: Quoted): SortedGrouped[K, (U, V)] = {
pipe.group
.sortBy { case (u, _) => u }
.scanLeft(Nil: List[(U, V)]) {
Expand All @@ -62,7 +64,8 @@ object CumulativeSum {
implicit ordS: Ordering[S],
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): TypedPipe[(K, (U, V))] = {
ordK: Ordering[K],
q: Quoted): TypedPipe[(K, (U, V))] = {

val sumPerS = pipe
.map { case (k, (u, v)) => (k, partition(u)) -> v }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import com.twitter.scalding._
import com.twitter.scalding.source.TypedText
import scala.collection.mutable.Buffer
import TDsl._
import com.twitter.scalding.quotation.Quoted

trait TBddDsl extends FieldConversions with TypedPipeOperationsConversions {

private implicit val q: Quoted = Quoted.internal

def Given[TypeIn](source: TypedTestSource[TypeIn]): TestCaseGiven1[TypeIn] = new TestCaseGiven1[TypeIn](source)

def GivenSources(sources: List[TypedTestSource[_]]): TestCaseGivenList = new TestCaseGivenList(sources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import cascading.tap._
import com.twitter.scalding.Dsl._
import scala.math.max
import scala.annotation.tailrec
import com.twitter.scalding.quotation.Quoted

/**
* Matrix class - represents an infinite (hopefully sparse) matrix.
Expand Down Expand Up @@ -138,10 +139,11 @@ class MatrixMappableExtensions[T](mappable: Mappable[T])(implicit fd: FlowDef, m
}

def toBlockMatrix[Group, Row, Col, Val](implicit ev: <:<[T, (Group, Row, Col, Val)], ord: Ordering[(Group, Row)],
setter: TupleSetter[(Group, Row, Col, Val)]): BlockMatrix[Group, Row, Col, Val] =
setter: TupleSetter[(Group, Row, Col, Val)],
q: Quoted): BlockMatrix[Group, Row, Col, Val] =
mapToBlockMatrix { _.asInstanceOf[(Group, Row, Col, Val)] }

def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)]): BlockMatrix[Group, Row, Col, Val] = {
def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)], q: Quoted): BlockMatrix[Group, Row, Col, Val] = {
val matPipe = TypedPipe
.from(mappable)
.map(fn)
Expand Down
Loading

0 comments on commit f49a434

Please sign in to comment.