Skip to content

Execution.flatMap(polished)

Compare
Choose a tag to compare
@ianoc ianoc released this 02 Dec 18:24
· 1645 commits to develop since this release

We are very excited about the hottest scalding yet!

Better join syntax

In the typed API you no longer need to type, “.group” before a join or in the argument to a join. Using a join method seems an explicit enough marker that you are crossing a map/reduce boundary, so this restriction is removed. It is still needed before a reduce (as TypedPipe.sum means a total sum, where as .group.sum means for each key group, sum the values). So from now on:

    pipe1.join(pipe2).join(pipe3)

is perfectly correct.

While the above runs 1 map/reduce job, the type of the values is a nested tuple: ((v1, v2), v3). In same cases for small joins this is fine, but for large joins this can be a pain. We added the MultiJoin object flatten tuples as you might expect:

    MultiJoin(pipe1, pipe2, pipe3) : CoGrouped[K, (V1, V2, V3)]
    MultiJoin.left(pipe1, pipe2, pipe3) : CoGrouped[K, (V1, Option[V2], Option[V3])]
    MultiJoin.outer(pipe1, pipe2, pipe3) : CoGrouped[K, (Option[V1], Option[V2], Option[V3])]

Hopefully this makes joining even cleaner and more powerful than it was in scalding 0.11.

TypedPipe is immutable and reusable

TypedPipe is now immutable. This means you can pass TypedPipe (or Grouped or any object in the Typed API) between jobs safely. When we improved the REPL, we had to handle the fact that some parts of the job may be run independently. To make the REPL behave as you expect, we had to make a fake immutable version behind the scenes. In this release, TypedPipe itself is immutable so the REPL becomes much simpler and other applications, such as passing TypedPipes between jobs or executing them in loops, becomes possible.

Execution[T]: a composable way to write Scalding Jobs

Have you ever wanted to convert a TypedPipe into an Iterator or List? Now you can:

    val myPipe: TypedPipe[T] = getPipe
    val iterable: Iterable[T] = myPipe.toIterableExecution.waitFor

Since the beginning, scalding.Job has had a primitive way to start a follow up job after the current one using Job.next. This had a number of issues. First, since TypedPipes could not be passed between jobs, the user had to manually write to intermediate files, and then read those files in the next job. Managing these temporary files was very painful. Also, the types of jobs were quite limited because there had to be a linear succession of Jobs (and though tricks could work around this, it was rarely done and ugly). Scalding 0.12 introduces Execution[T]. This type represents a scalding job that runs and returns T, and naturally, you can flatMap it. If you want to run two jobs in parallel, you can zip them together. An Execution[T] is basically a “run” function that takes the configuration, Mode, and then returns a Future[T]. This means you can also wire in service calls in the beginning, middle or end of your scalding job by lifting your call into an Execution.fromFuture.

Execution also enables looping. Consider PageRank: we want to run until we reach a level of convergence. We can do this by writing a function:

    def step(graph: TypedPipe[Edge], rank: TypedPipe[(Long, Double)]): Execution[(Double, TypedPipe[(Long, Double)])]

In the above, we express our algorithm step by propagation of the rank vector over the graph. The Execution returns the pair of the error between the old vector and the new vector and the new vector. Now we look:

val graph: TypedPipe[Edge]
def go(vector: TypedPipe[(Long, Double)]): Execution[TypedPipe[(Long, Double)]] =
  step(graph, vector).flatMap {
    case (err, v) if err < threshold => Execution.from(v)
    case (_, v) => go(v) // loop again
  }

// Now run the job.
val result = go(initVector).waitFor

For now, you need to put some operation in you scalding Job unless you use ExecutionJob or bypass using Job all-together if you choose, as done in ExecutionApp. ExecutionApp allows you do this:

object MyJob extends ExecutionApp {
  def job: Execution[Unit] = someExecutionLoopToDoAwesomeStuff
}

Then you can run MyJob as a normal class with the hadoop command. It parses arguments and makes them available in the Config object (see Execution.getConfig to access the Config and from there the Args).

Check out: .forceToDiskExecution, .writeExecution, .toIterableExecution, .forceToDiskExecution on TypedPipe to create executions you can use in loops, also .toOptionExecution and .getExecution on ValuePipe to get single values materialized.

As an example, see Kmeans: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/examples/KMeans.scala

Reducer Estimation:

The reducer estimation we developed has been made part of the open source release, meaning users can get automatic reducer setting based either on the input file sizes or on the history of the job. This promises to help us make more efficient use of our cluster. Estimation strategies are easy to implement, and will be an area of future improvement.

Full list of changes below:

  • Fix long compile time for MultiJoin helpers: #1109
  • Allows reducer estimation to operate on all hfs taps: #1080
  • Fix bufferedTake: #1107
  • Generate methods for flattening the results of many joins: #1097
  • Make TimePathedSource more configurable: #1105
  • Adding DailyPrefixSuffixLzoTsv: #1082
  • Option to select the fields for output in templatesource: #1061
  • Add a DailySuffixMostRecentLzoProtobuf source: #1104
  • Updates default scala version to 2.10.4: #1081
  • MultiSourceTap hashcode: #1101
  • scalding-core: merge flow step strategies to allow reducer estimation combined with other strategies: #1094
  • Improve command line handling of the execution app: #1083
  • More testing around the globifier with new properties: #1092
  • Refactor JDBCSource to add compile-time info about type of DB: #1087
  • Add a cumulative sum to KeyedList: #1085
  • Add in failing test case: #1090
  • Adds ability to also get the mode inside the Execution monad.: #1088
  • Enforce invariant: mapGroup iterators all nonempty: #1072
  • Allow PartitionSource to limit the number of open files: #1078
  • append to Cascading frameworks system property instead of setting it directly: #1076
  • Adds some output while assembly is building to keep travis happy: #1084
  • Only request necessary hadoop configs in hraven reducer estimator: #1067
  • Add parquet-scrooge sources: #1064
  • Outer join handles case when both are empty: #1065
  • Fix race in merging: #1063
  • Add support for column projection to parquet sources: #1056
  • Add typed version of RichPipe 'using': #1049
  • Add getExecution/getOrElseExecution: #1062
  • Change toIteratorExecution to toIterableExecution: #1058
  • Cache Execution evaluations: #1057
  • Add support for push down filters in parquet sources: #1050
  • Add support for Fold: #1053
  • move to use JobConf(true) for hadoop crazyness that causes host not foun...: #1051
  • Disable Cascading update check.: #1048
  • Respects -Dmapred.job.name when passed in on the command line: #1045
  • Add some instances from Algebird: #1039
  • Fix join.mapGroup issue: #1038
  • Add a defensive .forceToDisk in Sketched: #1035
  • Override toIterator for all Mappable with transformForRead: #1034
  • Make sinkFields in TypedDelimited final.: #1032
  • Fixed type of exception thrown by validateTaps: #1033
  • Add default local maven repo to the resolver list: #1024
  • Add an ExecutionApp trait for objects to skip the Job class: #1027
  • Make each head pipe have a unique name: #1025
  • Run REPL from SBT: #1021
  • Add Config to openForRead: #1023
  • Fix replConfig merging and evaluate values in Config.fromHadoop: #1015
  • REPL Autoload file: #1009
  • Fix hRaven Reducer Estimator: #1018
  • Update Cascading JDBC Version.: #1016
  • Some Execution fixes: #1007
  • Refactor InputSizeReducerEstimator to correctly unroll MultiSourceTaps: #1017
  • Fix issue #1011: Building develop branch fails: #1012
  • hRaven Reducer Estimator: #996
  • JsonLine should handle empty lines: #966
  • Add comments for memory-related reduce operations.: #1006
  • Add the remaining odds and ends to Execution[T]: #985
  • Fix up the tests to run forked, and split across lots of travis builds: #993
  • Typedpipe partition: #987
  • Fix toIterator bug (#988): #990
  • Basic reducer estimator support: #973
  • Improve TypedSimilarity algorithm and update test.: #983
  • Adds support for Counters inside the Execution Monad.: #982
  • Make map/flatMap lazy on IterablePipe to address OOM: #981
  • JsonLine: enable read transformation in test to get correct fields in sourceTap: #971
  • Read and writable partitioned sources: #969
  • Make an Execution[T] type, which is a monad, which makes composing Jobs easy.: #974
  • Generalize handling of merged TypedPipes: #975
  • Do not inherit from FileSource in LzoTraits: #976
  • Make TypedPipe immutable: #968
  • Adds an optional source: #963
  • Add pipe1.join(pipe2) syntax in TypedAPI: #958
  • Extending BddDsl for Typed API: #956
  • VerticaJdbcDriver: #957
  • fix the example usage in JDBCSource: #955
  • Push back off ec2 requiring sudo, build failures are a nightmare: #953
  • Add ExecutionContextJob to interop execution style with Job style: #952