#Sparkling Water Table of Contents
- Compiling examples
- Running examples
- Configuring variables
- Step-by-step weather example
- Running Sparkling Water on Hadoop
- Importing data from HDFS
CraigslistJobTitlesStreamingApp
- stream application - it predicts job category based on incoming job descriptionCraigslistJobTitlesApp
- predict job category based on posted job descriptionChicagoCrimeAppSmall
- builds a model predicting a probability of arrest for given crime in Chicago using data insmalldata
directoryChicagoCrimeApp
- implementation of Chicago Crime demo with setup for data stored on HDFSCitiBikeSharingDemo
- predicts occupancy of Citi bike stations in NYCHamOrSpamDemo
- shows Spam detector with Spark and H2O's DeepLearningProstateDemo
- running K-means on prostate dataset (see smalldata/prostate.csv)DeepLearningDemo
- running DeepLearning on a subset of airlines dataset (see smalldata/allyears2k_headers.csv.gz)AirlinesWithWeatherDemo
- joining flights data with weather data and running Deep LearningAirlinesWithWeatherDemo2
- new iteration ofAirlinesWithWeatherDemo
Run examples by typing
bin/run-example.sh <name of demo>
or follow text below.
chicagoCrimeSmallShell.script.scala
- demo showing full source code of predicting arrest probability for a given crime. It covers whole machine learning process from loading and transforming data, building models, scoring incoming events.chicagoCrimeSmall.script.scala
- example of using ChicagoCrimeApp - creating application and using it for scoring individual crime events.mlconf_2015_hamSpam.script.scala
- HamOrSpam application which detectes Spam messages. Presented at MLConf 2015 NYC.strata2015_demo.scala
- NYC CitiBike demo presented at Strata 2015 in San Jose.StrataAirlines.scala
- example of using flights and weather data to predict delay of a flight
Run examples by typing
bin/sparkling-shell -i <path to file with demo script>
To compile, use top-level gradlew
:
./gradlew assemble
Run a given example on local cluster. The cluster is defined by MASTER
address local-cluster[3,2,3072]
which means that cluster contains 3 worker nodes, each having 2CPUs and 3GB of memory:
- Run
bin/run-example.sh <name of demo>
- Run the Spark cluster, for example via
bin/launch-spark-cloud.sh
- Verify that Spark is running: The Spark UI on
http://localhost:8080/
should show 3 worker nodes
- Verify that Spark is running: The Spark UI on
- Export
MASTER
address of Spark master usingexport MASTER="spark://localhost:7077"
- Run
bin/run-example.sh <name of demo>
- Observe status of the application via Spark UI on
http://localhost:8080/
You can configure Sparkling Water using the following variables:
spark.h2o.cloud.timeout
- number of milliseconds to wait for cloud formationspark.h2o.workers
- number of expected H2O workers; it should be same as number of Spark workersspark.h2o.preserve.executors
- do not kill executors via callingsc.stop()
call
- Run Sparkling shell with an embedded cluster:
export SPARK_HOME="/path/to/spark/installation"
export MASTER="local-cluster[3,2,1024]"
bin/sparkling-shell
-
To see the Sparkling shell (i.e., Spark driver) status, go to http://localhost:4040/.
-
Create an H2O cloud using all 3 Spark workers:
import org.apache.spark.h2o._
val h2oContext = new H2OContext(sc).start()
import h2oContext._
- Load weather data for Chicago international airport (ORD), with help from the RDD API:
import org.apache.spark.examples.h2o._
val weatherDataFile = "examples/smalldata/Chicago_Ohare_International_Airport.csv"
val wrawdata = sc.textFile(weatherDataFile,3).cache()
val weatherTable = wrawdata.map(_.split(",")).map(row => WeatherParse(row)).filter(!_.isWrongRow())
- Load airlines data using the H2O parser:
import java.io.File
val dataFile = "examples/smalldata/allyears2k_headers.csv.gz"
val airlinesData = new H2OFrame(new File(dataFile))
- Select flights destined for Chicago (ORD):
val airlinesTable : RDD[Airlines] = asRDD[Airlines](airlinesData)
val flightsToORD = airlinesTable.filter(f => f.Dest==Some("ORD"))
- Compute the number of these flights:
flightsToORD.count
- Use Spark SQL to join the flight data with the weather data:
import sqlContext.implicits._
flightsToORD.toDF.registerTempTable("FlightsToORD")
weatherTable.toDF.registerTempTable("WeatherORD")
- Perform SQL JOIN on both tables:
val bigTable = sqlContext.sql(
"""SELECT
|f.Year,f.Month,f.DayofMonth,
|f.CRSDepTime,f.CRSArrTime,f.CRSElapsedTime,
|f.UniqueCarrier,f.FlightNum,f.TailNum,
|f.Origin,f.Distance,
|w.TmaxF,w.TminF,w.TmeanF,w.PrcpIn,w.SnowIn,w.CDD,w.HDD,w.GDD,
|f.ArrDelay
|FROM FlightsToORD f
|JOIN WeatherORD w
|ON f.Year=w.Year AND f.Month=w.Month AND f.DayofMonth=w.Day""".stripMargin)
- Transform the first 3 columns containing date information into enum columns:
val bigDataFrame: H2OFrame = bigTable // implicit conversion from RDD to DataFrame
for( i <- 0 to 2) bigDataFrame.replace(i, bigDataFrame.vec(i).toEnum)
bigDataFrame.update(null)
- Run deep learning to produce a model estimating arrival delay:
import hex.deeplearning.DeepLearning
import hex.deeplearning.DeepLearningModel.DeepLearningParameters
import hex.deeplearning.DeepLearningModel.DeepLearningParameters.Activation
val dlParams = new DeepLearningParameters()
dlParams._train = bigDataFrame
dlParams._response_column = 'ArrDelay
dlParams._epochs = 5
dlParams._activation = Activation.RectifierWithDropout
dlParams._hidden = Array[Int](100, 100)
// Create a job
val dl = new DeepLearning(dlParams)
val dlModel = dl.trainModel.get
- Use the model to estimate the delay on the training data:
val predictionH2OFrame = dlModel.score(bigTable)('predict)
val predictionsFromModel = asDataFrame(predictionH2OFrame)(sqlContext).collect.map(row => if (row.isNullAt(0)) Double.NaN else row(0))
- Generate an R-code producing residual plot:
import org.apache.spark.examples.h2o.DemoUtils.residualPlotRCode
residualPlotRCode(predictionH2OFrame, 'predict, bigTable, 'ArrDelay)
- Execute generated R-code in RStudio:
#
# R script for residual plot
#
# Import H2O library
library(h2o)
# Initialize H2O R-client
h = h2o.init()
# Fetch prediction and actual data, use remembered keys
pred = h2o.getFrame(h, "dframe_b5f449d0c04ee75fda1b9bc865b14a69")
act = h2o.getFrame (h, "frame_rdd_14_b429e8b43d2d8c02899ccb61b72c4e57")
# Select right columns
predDelay = pred$predict
actDelay = act$ArrDelay
# Make sure that number of rows is same
nrow(actDelay) == nrow(predDelay)
# Compute residuals
residuals = predDelay - actDelay
# Plot residuals
compare = cbind (as.data.frame(actDelay$ArrDelay), as.data.frame(residuals$predict))
nrow(compare)
plot( compare[,1:2] )
Compatible Hadoop Distributions: CDH4, CDH5, and HDP2.1
- To install on your Hadoop Cluster, clone the git repository and make a build:
git clone https://github.com/0xdata/sparkling-water.git
cd sparkling-water
./gradlew build
- Set
MASTER
to the IP address of where your Spark Master Node is launched and setSPARK_HOME
to the location of your Spark installation. In the example below the path forSPARK_HOME
is the default location of Spark preinstalled on a CDH5 cluster. Please changeMASTER
below:
export MASTER="spark://mr-0xd9-precise1.0xdata.loc:7077"
export SPARK_HOME="/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.11/lib/spark"
- Launch Sparkling Shell:
./bin/sparkling-shell
The initialization of H2O remains the same, with the exception of importing data from a HDFS path. Please change path variable below to one suitable for your data.
import org.apache.spark.h2o._
import org.apache.spark.examples.h2o._
// Create H2O context
val h2oContext = new H2OContext(sc).start()
// Export H2O context to the
import h2oContext._
// URI to access HDFS file
val path = "hdfs://mr-0xd6-precise1.0xdata.loc:8020/datasets/airlines_all.05p.csv"
val d = new java.net.URI(path)
val f = new DataFrame(d)