-
Notifications
You must be signed in to change notification settings - Fork 11
Part 2
In our first installment of this series we showed how to create the simplest possible Cascalog application. If you haven't read that yet, it's probably best to start there.
Today's lesson takes the same app and stretches it a bit further. Undoubtedy you've seen Word Count before. We'd feel remiss if we did not provide a Word Count example. It's the "Hello World" of MapReduce apps. Fortunately, this code is one of the basic steps toward developing a TF-IDF implementation. How convenient. We'll also show how to use Cascading to generate a visualization of your MapReduce app.
Our example code in Part 1 of this series showed how to move data from point A to point B. It was simply a distributed file copy -- loading data via distributed tasks, an instance of the "L" in ETL.
That may seem overly simple, and it may seem like Cascalog/Cascading is overkill for that kind of work. However, moving important data from point A to point B reliably can be a crucial job to perform. This helps illustrate one of the key reasons to use Cascading.
Let's use an analogy of building a small ferris wheel. With a little bit of imagination and some background in welding, a person could cobble together one using old bicycles parts. In fact, those DIY ferris wheels show up at events such as Maker Faire. Starting out, a person might construct a little ferris wheel, just for demo. It might not hold anything larger than hamsters, but it's not a hard problem. With a bit more skill, a person could probably build a somewhat larger instance, one that's big enough for small children to ride.
Let me ask this: how robust would a DIY ferris wheel need to be before you let your kids ride on it? That's precisely part of the challenge at an event like Maker Faire. Makers must be able to build a device such as a ferris wheel out of spare bicycle parts which is robust enough that strangers will let their kids ride. Let's hope those welds were made using best practices and good materials, to avoid catastrophes.
That's a key reason why Cascading was created. When you need to move a few Gb from point A to point B, it's probably simple enough to write a Bash script, or just use a single command line copy. When your work requires some reshaping of the data, then a few lines of Python will probably work fine. Run that Python code from your Bash script and you're done. I've used that approach many times, when it fit the use case requirements. However, suppose you're not moving just Gb around? Suppose you're moving Tb, or Pb? Bash scripts won't get you very far. Also think about this: suppose your app not only needs to move data from point A to point B, but it must run within the constraints of an enterprise IT shop. Millions of dollars and potentially even some jobs ride on the fact that your app performs correctly. Day in and day out. That's not unlike strangers trusting a ferris wheel; they want to make sure it wasn't just built out of spare bicycle parts by some amateur welder. Robustness is key.
Or taking this analogy a few steps in another interesting direction… Perhaps you're not only moving data and reshaping it a little, but you're applying some interesting machine learning algorithms, some natural language processing, resequencing genes… who knows. Those imply lots of resource use, lots of potential expense in case of failures. Or lots of customer exposure. You'll want to use an application framework which is significantly more robust than a bunch of scripts cobbled together.
With Cascalog/Cascading, you can package your entire MapReduce application, including its orchestration and testing, within a single JAR file. You define all of that within the context of one programming language -- whether that language may be Java, Scala, Clojure, Python, Ruby, etc. That way your tests are included within a single program, not spread across several scripts written in different languages. Having a single JAR file define the app also allows for typical tooling required in enterprise IT: unit tests, stream assertions, revision control, continuous integration, Maven repos, role-based configuration management, advanced schedulers, monitoring and notifications, etc.
Those are key reasons why we make Cascading, why people use it for robust MapReduce apps which run at scale.
Meanwhile, a conceptual diagram for this implementation of Word Count in Cascading is shown as:
Download source for this example on GitHub. An output log for this example are listed in a gist. The input data stays the same as in the earlier code.
Previously we defined a simple query to connect the taps. This example shows a more complex query. We use a custom operation inside the query to split the document text into a token stream. We use a regex to split on word boundaries:
(defmapcatop split [line]
"reads in a line of string and splits it by regex"
(s/split line #"[\[\]\\\(\),.)\s]+"))
Out of that operator, we'll get a tuple stream of word
to feed into the count. In effect converting a horizontal wide-tuple into a vertical series of 1-tuples. You can change the regex to handle more complex cases of splitting tokens -- without having to rewrite a different custom operator.
Next, we use a count to count the occurrences of each token:
(c/count ?count)
From that predicate, we'll have a resulting tuple stream of ?word
and ?count
for the output. So we connect up the query:
(?<- (hfs-delimited out)
[?word ?count]
((hfs-delimited in :skip-header? true) _ ?line)
(split ?line :> ?word)
(c/count ?count))
Notice that we do not specify what to count as this is done implicitly. In logic programming, we write logic rules and let the machine (e.g. Cascalog) figures out what satisfy our rules to be returned as results. In our query, we stated ?word
and ?count
as output, but we already have ?word
coming from split
, thus the flow of ?word
is satisfied. To satisfy the output ?count
, the predicate c/count
would perform a group-by count on each of the ?word
.
Hopefully this will make sense as we proceed with the tutorial to build on this example.
For now, place those source lines all into a main
method, then build a JAR file. You should be good to go.
We have annotated the flow diagram to show where the mapper and reducer phases are running:
If you want to read in more detail about the Cascalog API which were used, see the Cascalog Wiki and JavaDoc.
The build for this example is based on using Leiningen. To build the sample app from the command line use:
lein uberjar
What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo -- almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org
Before running this sample app, you'll need to have a supported release of Apache Hadoop installed. Here's what was used to develop and test our example code:
$ hadoop version
Hadoop 1.0.3
Be sure to set your HADOOP_HOME
environment variable. Then clear the output
directory (Apache Hadoop insists, if you're running in standalone mode) and run the app:
rm -rf output
hadoop jar ./target/impatient.jar data/rain.txt output/wc
Output text gets stored in the partition file output/wc
which you can then verify:
more output/wc/part-00000
Again, here's a log file from our run of the sample app, part 2. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascalog-user email forum.
So that's our Word Count example. Twenty lines of yummy goodness. Stay tuned for the next installments of our Cascalog for the Impatient series.