Skip to content
Quantisan edited this page Oct 16, 2012 · 20 revisions

Cascalog for the Impatient, Part 6

In our fifth installment of this series we showed how to implement TF-IDF in Cascalog application. If you haven’t read that yet, it’s probably best to start there.

Today's post extends the TF-IDF app to show best practices for test-driven development (TDD) at scale. We’ll incorporate unit tests into the build (should have done so sooner), plus show how to leverage TDD features in Cascalog such as, checkpoints, traps, assertions, etc. Some of these features are based on using Cascalog-Checkpoint and Midje-Cascalog.

Theory

At first glance, the notion of test-driven development (TDD) might seem a bit antithetical in the context of Big Data. After all, TDD is all about short development cycles, writing automated test cases which are intended to fail, and lots of refactoring. Those descriptions would not appear to fit with batch jobs involving terabytes of data and huge clusters running apps that take days to complete.

Stated in a different way, according to Kent Beck, TDD “encourages simple designs and inspires confidence.” That statement does actually fit well with Cascading. The API is intended to provide simple design patterns for working with data – GroupBy, Join, Count, Regex, Filter – so that the need for writing custom functions becomes relatively rare. That speaks to “encouraging simple designs” directly. The practice in Cascading of modeling business process and orchestrating MapReduce workflows – that speaks to “inspiring confidence” in a big way.

So now we’ll let the cat out of the bag for a little secret... Working with unstructured data at scale has been shown to be quite valuable (Google, Amazon, LinkedIn, Twitter, etc.) however most of the “heavy lifting” which we perform in MapReduce workflows is essentially cleaning up data. DJ Patil explained this point quite eloquently in Data Jujitsu: “It’s impossible to overstress this: 80% of the work in any data project is in cleaning the data ... Work done up front in getting clean data will be amply repaid over the course of the project.”

Cleaning up the data allows for subsequent use of sampling techniques, dimensional reduction, and other practices which help alleviate some of the bottlenecks which might otherwise be encountered in Big Data. In other words, there are great use cases for formalisms which help demonstrate that “dirty” data at scale has been cleaned up. Those turn out to be quite valuable in practice.

However, TDD practices tend to be based on unit tests or mocks ... how does one write a quick unit test for a Godzilla-sized dataset?

The short answer is: you don’t. However, you can greatly reduce the need for writing unit test coverage by limiting the amount of custom code required. Hopefully we’ve shown that aspect of Cascading/Cascalog by now. Beyond that aspect, you can use sampling techniques to quantify the confidence for an app running correctly. You can also define system tests at scale in relatively simple ways. Furthermore, you can define contingencies for what to do when assumptions fail ... as they inevitably do, at scale.

Let’s discuss sampling ... generally speaking, large MapReduce workflows are relatively opaque processes which are difficult to observe. However, Cascalog provides techniques for observing portions of a workflow. One approach is to use Cascalog-Checkpoint, which forces intermediate data to be written out to HDFS. This is also important for performance reasons, i.e., forcing results to disk to avoid recomputing – e.g., when there are multiple uses for the output of a pipe downstream such as etl-docs-gen in Part 5. If the data is large, you can use fixed-sample to sample either before or after the data gets persisted to HDFS.

Next, let’s talk about system tests. Cascalog include support for stream assertions. These provide mechanisms for asserting that the values in a tuple stream meet certain criteria – similar to the assert keyword in Clojure (in fact, it is), or an assert not null in a unit test. We can assert patterns strictly as unit tests during development, then run testing against regression data. For performance reasons, we might use command line options to turn off assertions in production. Or keep them, if a use case requires that level of guarantees.

Lastly, what to do when assumptions fail? One lesson of working with data at scale is that the best assumptions will inevitably fail. Unexpected things happen, and 80% of the work will be cleaning up problems. Cascading defines failure traps which capture data that causes an Operation to fail, e.g., throw an Exception. For example, perhaps 99% of the cases in your log files can be rolled up into a set of standard reports... but 1% requires manual review. Great, process the 99% which work and shunt the 1% failure cases into a special file, marked “for manual review”. Keep in mind, however, that traps are intended for handling exceptional cases. If you know in advance how to categorize good vs. bad data, then use a filter instead of a trap.

Meanwhile, a conceptual diagram for this implementation of TF-IDF in Cascading is shown as:

Conceptual Workflow Diagram

Source

Download source for this example on GitHub. For quick reference, a log for this example and output are listed in a gist.

Let’s add a unit test and show how that works into this example. In project.clj, add [midje-cascalog "0.4.0"] as a :dev dependency.

A little restructuring of the source directories is requried – see our GitHub code repo, where it’s all set up property. Then we add a unit test for our custom function to “scrub” tokens, which was created in Part 3 (then removed in Part 4 and now added back in for this illustration). This goes into a new file test/core_test.clj:

(deftest scrub-text-test
  (fact
    (scrub-text "FoO BAR  ") => "foo bar"))

This is a particularly good place for a unit test. Scrubbing tokens is a likely point at which edge cases get encountered at scale. Similarly, let's write tests for some of our other custom functions to verify behaviours.

(deftest etl-docs-gen-test
  (let [rain [["doc1" "a b c"]]
        stop [["b" true]]]
    (fact
      (etl-docs-gen rain stop) => (produces [["doc1" "a"]
                                             ["doc1" "c"]]))))

Next, going back to the core.clj module, let’s add a sink tap for writing out trapped data in etl-docs-gen:

(:trap (hfs-textline "output/trap" :sinkmode :update))

Next we’ll modify the head of the existing pipe assembly for TF-IDF to incorporate a Stream Assertion. We use a Clojure assert to define the expected pattern for input data. Then wrap it in a filter to force validation of the data:

(defn assert-tuple [pred msg x]
  "helper function to add assertion to tuple stream"
  (when (nil? (assert (pred x) msg))
    true))

In particular, we want to ensure that the input data structure is what we expect.

(def assert-doc-id ^{:doc "assert doc-id is correct format"} 
  (partial assert-tuple #(re-seq #"doc\d+" %) "unexpected doc-id"))

Next we’ll checkpoint the query execution in terms of data dependency. That forces the tuples at each checkpoint step in the workflow to be persisted to HDFS:

(defn -main [in out stop tfidf & args]
  (workflow
    ["tmp/checkpoint"]
    etl-step ([:tmp-dirs etl-stage]
              (let [rain (hfs-delimited in :skip-header? true)
                    stop (hfs-delimited stop :skip-header? true)]
                (?- (hfs-delimited etl-stage)
                    (etl-docs-gen rain stop))))
    tf-step  ([:deps etl-step]
              (let [src (name-vars (hfs-delimited etl-stage :skip-header? true) ["?doc-id" "?word"])]
                (?- (hfs-delimited tfidf)
                    (TF-IDF src)))) 
    wrd-step ([:deps etl-step]
              (?- (hfs-delimited out)
                  (word-count (hfs-delimited etl-stage))))))

Don't forget to add cascalog-checkpoint as a dependency in project.clj.

Modify the main method to make those changes, then build a JAR file. You should be good to go.

Here is the dot diagram from the Cascading exercise to show where the mapper and reducer phases are running, and also the sections which were added since Part 5. These will be somewhat different than what we've done with Cascalog here.

TF-IDF Flow Diagram

Build

To build the sample app from the command line use:

lein uberjar

What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org

Test

To run the unit tests from the command line, use:

lein test

Run

Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:

$ hadoop version
Hadoop 1.0.3

Be sure to clear the output directory (Apache Hadoop insists, if you're running in standalone mode) and run the app:

rm -rf output
hadoop jar target/impatient.jar data/rain.txt output/wc data/en.stop output/tfidf output/trap output/check

The output log should include a warning, based on the stream assertion, which looks like this:

12/08/28 18:52:18 WARN stream.TrapHandler: exception trap on branch: 'dcb3c45e-74ae-41a9-b401-7a585a4dffef', for fields: [{1}:'?doc-id'] tuple: ['zoink']
cascading.pipe.OperatorException: [dcb3c45e-74ae-41a9-b40...][sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)] operator Each failed executing operation
	at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:68)
	at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:33)
	at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:67)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:86)
	at cascading.operation.Identity.operate(Identity.java:110)
        ...

That is expected behaviour. The data which caused this warning will get trapped for later inspection.

Output text gets stored in the directory output which you can then verify:

more output/tfidf/part-00000
more output/trap/part-m-00001-00000 
more output/check/part-00000

Notice the data tuple output/trap:

zoink   null

That did not match the regex doc\\d+\\s.* which was specified by the stream assertion.

Here’s a log file from our run of the sample app, part 6. If your run looks terribly different, something is probably not set up correctly.

To run this same app on the Amazon AWS Elastic MapReduce service, based on their command line interface, use the following commands. Be sure to replace temp.cascading.org with your own S3 bucket name:

s3cmd put target/impatient.jar s3://temp.cascading.org/impatient/part6.jar
s3cmd put data/rain.txt s3://temp.cascading.org/impatient/
s3cmd put data/en.stop s3://temp.cascading.org/impatient/

elastic-mapreduce --create --name "TF-IDF" \
  --jar s3n://temp.cascading.org/impatient/part6.jar \
  --arg s3n://temp.cascading.org/impatient/rain.txt \
  --arg s3n://temp.cascading.org/impatient/out/wc \
  --arg s3n://temp.cascading.org/impatient/en.stop \
  --arg s3n://temp.cascading.org/impatient/out/tfidf \
  --arg s3n://temp.cascading.org/impatient/out/trap \
  --arg s3n://temp.cascading.org/impatient/out/check

Drop us a line on the cascalog-user email forum.

Clone this wiki locally