diff --git a/.gitignore b/.gitignore index d5149cea..cc1e7d75 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ target *.iml .idea - +scalastyle-output.xml diff --git a/README.md b/README.md index 1f4f3ba0..7eae5ed0 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,12 @@ A Scala / Java library for interacting with time series data on Apache Spark. Scaladoc is available at http://cloudera.github.io/spark-timeseries. The aim here is to provide -* A set of abstractions for transforming and summarizing large time series data sets, similar to +* A set of abstractions for manipulating large time series data sets, similar to what's provided for smaller data sets in - [Pandas](http://pandas.pydata.org/pandas-docs/dev/timeseries.html) and - [Matlab](http://www.mathworks.com/help/matlab/time-series.html). + [Pandas](http://pandas.pydata.org/pandas-docs/dev/timeseries.html), + [Matlab](http://www.mathworks.com/help/matlab/time-series.html), and R's + [zoo](http://cran.r-project.org/web/packages/zoo/index.html) and + [xts](http://cran.r-project.org/web/packages/xts/index.html) packages. * Models, tests, and functions that enable dealing with time series from a statistical perspective, similar to what's provided in [StatsModels](http://statsmodels.sourceforge.net/devel/tsa.html) and a variety of Matlab and R packages. @@ -52,16 +54,24 @@ TimeSeriesRDDs then support efficient series-wise operations like slicing, imput val residuals = filled.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series)) -Statistical Functionality +Functionality -------------------------- -### Time Series +### Time Series Manipulation +* Aligning +* Slicing by date-time +* Missing value imputation +### Time Series Math and Stats + +* Exponentially weighted moving average * Autoregressive models * GARCH models * Missing data imputation * Augmented Dickey-Fuller test * Durbin-Watson test +* Breusch-Godfrey test +* Breusch-Pagan test ### General Prob / Stats diff --git a/pom.xml b/pom.xml index 97824250..863562a8 100644 --- a/pom.xml +++ b/pom.xml @@ -1,6 +1,6 @@ - + 4.0.0 com.cloudera.datascience sparktimeseries @@ -72,14 +74,43 @@ + org.apache.maven.plugins maven-surefire-plugin + 2.12 true + + + + org.scalastyle + scalastyle-maven-plugin + 0.7.0 + + false + false + true + true + ${basedir}/src/main/scala + ${basedir}/src/test/scala + ${basedir}/scalastyle-config.xml + ${project.basedir}/scalastyle-output.xml + UTF-8 + + + + compile + + check + + + + + org.scalatest @@ -160,6 +191,12 @@ + + org.apache.hadoop + hadoop-yarn-client + 2.6.0 + provided + org.scala-lang scala-library @@ -198,8 +235,7 @@ org.apache.spark spark-mllib_${scala.minor.version} - - 1.2.0 + ${spark.version} provided diff --git a/scalastyle-config.xml b/scalastyle-config.xml new file mode 100644 index 00000000..3aa09f33 --- /dev/null +++ b/scalastyle-config.xml @@ -0,0 +1,160 @@ + + + + + + + + + + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala b/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala index 63f42d56..5765b464 100644 --- a/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala +++ b/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala @@ -25,7 +25,7 @@ import com.cloudera.sparkts.TimeSeriesRDD._ import com.github.nscala_time.time.Imports._ -import org.apache.commons.math3.random.RandomGenerator +import org.apache.commons.math3.random.{MersenneTwister, RandomGenerator} import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression import org.apache.spark.{SparkConf, SparkContext} @@ -33,10 +33,11 @@ import org.apache.spark.{SparkConf, SparkContext} object HistoricalValueAtRiskExample { def main(args: Array[String]): Unit = { // Read parameters - val numTrials = if (args.length > 0) args(0).toInt else 10000 - val parallelism = if (args.length > 1) args(1).toInt else 10 - val factorsDir = if (args.length > 2) args(2) else "factors/" - val instrumentsDir = if (args.length > 3) args(3) else "instruments/" + val factorsDir = if (args.length > 0) args(0) else "factors/" + val instrumentsDir = if (args.length > 1) args(1) else "instruments/" + val numTrials = if (args.length > 2) args(2).toInt else 10000 + val parallelism = if (args.length > 3) args(3).toInt else 10 + val horizon = if (args.length > 4) args(4).toInt else 1 // Initialize Spark val conf = new SparkConf().setMaster("local").setAppName("Historical VaR") @@ -66,13 +67,18 @@ object HistoricalValueAtRiskExample { // Fit an AR(1) + GARCH(1, 1) model to each factor val garchModels = factorReturns.mapValues(ARGARCH.fitModel(_)).toMap - val iidFactorReturns = factorReturns.mapSeriesWithLabel { case (symbol, series) => + val iidFactorReturns = factorReturns.mapSeriesWithKey { case (symbol, series) => val model = garchModels(symbol) model.removeTimeDependentEffects(series, DenseVector.zeros[Double](series.length)) } // Generate an RDD of simulations -// val rand = new MersenneTwister() + val seeds = sc.parallelize(0 until numTrials, parallelism) + seeds.map { seed => + val rand = new MersenneTwister(seed) + val factorPaths = simulatedFactorReturns(horizon, rand, iidFactorReturns, garchModels) + } + // val factorsDist = new FilteredHistoricalFactorDistribution(rand, iidFactorReturns.toArray, // garchModels.asInstanceOf[Array[TimeSeriesFilter]]) // val returns = simulationReturns(0L, factorsDist, numTrials, parallelism, sc, @@ -101,7 +107,7 @@ object HistoricalValueAtRiskExample { mat(i, ::) := iidSeries.data(rand.nextInt(iidSeries.data.rows), ::) } (0 until models.size).foreach { i => - models(iidSeries.labels(i)).addTimeDependentEffects(mat(::, i), mat(::, i)) + models(iidSeries.keys(i)).addTimeDependentEffects(mat(::, i), mat(::, i)) } mat } diff --git a/src/main/scala/com/cloudera/finance/examples/SimpleTickDataExample.scala b/src/main/scala/com/cloudera/finance/examples/SimpleTickDataExample.scala index 740c5241..90d39ed6 100644 --- a/src/main/scala/com/cloudera/finance/examples/SimpleTickDataExample.scala +++ b/src/main/scala/com/cloudera/finance/examples/SimpleTickDataExample.scala @@ -71,6 +71,6 @@ object SimpleTickDataExample { val iidRdd = slicedRdd.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series)) // Regress a stock against all the others - val samples = iidRdd.toSamples() + val samples = iidRdd.toInstants() } } diff --git a/src/main/scala/com/cloudera/sparkts/EWMA.scala b/src/main/scala/com/cloudera/sparkts/EWMA.scala new file mode 100644 index 00000000..1cd64217 --- /dev/null +++ b/src/main/scala/com/cloudera/sparkts/EWMA.scala @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.sparkts + +import breeze.linalg._ + +import org.apache.commons.math3.analysis.{MultivariateFunction, MultivariateVectorFunction} +import org.apache.commons.math3.optim.{InitialGuess, MaxEval, MaxIter, SimpleValueChecker} +import org.apache.commons.math3.optim.nonlinear.scalar.{GoalType, ObjectiveFunction, + ObjectiveFunctionGradient} +import org.apache.commons.math3.optim.nonlinear.scalar.gradient.NonLinearConjugateGradientOptimizer + +/** + * Fits an Exponentially Weight Moving Average model (EWMA) (aka. Simple Exponential Smoothing) to + * a time series. The model is defined as S_t = (1 - a) * X_t + a * S_{t - 1}, where a is the + * smoothing parameter, X is the original series, and S is the smoothed series. For more + * information, please see https://en.wikipedia.org/wiki/Exponential_smoothing. + */ +object EWMA { + /** + * Fits an EWMA model to a time series. Uses the first point in the time series as a starting + * value. Uses sum squared error as an objective function to optimize to find smoothing parameter + * The model for EWMA is recursively defined as S_t = (1 - a) * X_t + a * S_{t-1}, where + * a is the smoothing parameter, X is the original series, and S is the smoothed series + * Note that the optimization is performed as unbounded optimization, although in its formal + * definition the smoothing parameter is <= 1, which corresponds to an inequality bounded + * optimization. Given this, the resulting smoothing parameter should always be sanity checked + * https://en.wikipedia.org/wiki/Exponential_smoothing + * @param ts the time series to which we want to fit an EWMA model + * @return EWMA model + */ + def fitModel(ts: Vector[Double]): EWMAModel = { + val optimizer = new NonLinearConjugateGradientOptimizer( + NonLinearConjugateGradientOptimizer.Formula.FLETCHER_REEVES, + new SimpleValueChecker(1e-6, 1e-6)) + val gradient = new ObjectiveFunctionGradient(new MultivariateVectorFunction() { + def value(params: Array[Double]): Array[Double] = { + val g = new EWMAModel(params(0)).gradient(ts) + Array(g) + } + }) + val objectiveFunction = new ObjectiveFunction(new MultivariateFunction() { + def value(params: Array[Double]): Double = { + new EWMAModel(params(0)).sse(ts) + } + }) + // optimization parameters + val initGuess = new InitialGuess(Array(.94)) + val maxIter = new MaxIter(10000) + val maxEval = new MaxEval(10000) + val goal = GoalType.MINIMIZE + // optimization step + val optimal = optimizer.optimize(objectiveFunction, goal, gradient, initGuess, maxIter, maxEval) + val params = optimal.getPoint + new EWMAModel(params(0)) + } +} + +class EWMAModel(val smoothing: Double) extends TimeSeriesModel { + + /** + * Calculates the SSE for a given timeseries ts given the smoothing parameter of the current model + * The forecast for the observation at period t + 1 is the smoothed value at time t + * Source: http://people.duke.edu/~rnau/411avg.htm + * @param ts the time series to fit a EWMA model to + * @return Sum Squared Error + */ + private[sparkts] def sse(ts: Vector[Double]): Double = { + val n = ts.length + val smoothed = new DenseVector(Array.fill(n)(0.0)) + addTimeDependentEffects(ts, smoothed) + + var i = 0 + var error = 0.0 + var sqrErrors = 0.0 + while (i < n - 1) { + error = ts(i + 1) - smoothed(i) + sqrErrors += error * error + i += 1 + } + + sqrErrors + } + + /** + * Calculates the gradient of the SSE cost function for our EWMA model + * @param ts + * @return gradient + */ + private[sparkts] def gradient(ts: Vector[Double]): Double = { + val n = ts.length + val smoothed = new DenseVector(Array.fill(n)(0.0)) + addTimeDependentEffects(ts, smoothed) + + var error = 0.0 + var prevSmoothed = ts(0) + var prevDSda = 0.0 // derivative of the EWMA function at time t - 1: (d S(t - 1)/ d smoothing) + var dSda = 0.0 // derivative of the EWMA function at time t: (d S(t) / d smoothing) + var dJda = 0.0 // derivative of our SSE cost function + var i = 0 + + while (i < n - 1) { + error = ts(i + 1) - smoothed(i) + dSda = ts(i) - prevSmoothed + (1 - smoothing) * prevDSda + dJda += error * dSda + prevDSda = dSda + prevSmoothed = smoothed(i) + i += 1 + } + 2 * dJda + } + + override def removeTimeDependentEffects(ts: Vector[Double], dest: Vector[Double] = null) + : Vector[Double] = { + dest(0) = ts(0) // by definition in our model S_0 = X_0 + + for (i <- 1 until ts.length) { + dest(i) = (ts(i) - (1 - smoothing) * ts(i - 1)) / smoothing + } + dest + } + + override def addTimeDependentEffects(ts: Vector[Double], dest: Vector[Double]): Vector[Double] = { + dest(0) = ts(0) // by definition in our model S_0 = X_0 + + for (i <- 1 until ts.length) { + dest(i) = smoothing * ts(i) + (1 - smoothing) * dest(i - 1) + } + dest + } +} diff --git a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala index bbafca3d..6a9a8c8c 100644 --- a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala +++ b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala @@ -18,31 +18,102 @@ package com.cloudera.sparkts import breeze.linalg._ import breeze.plot._ +import org.apache.commons.math3.distribution.NormalDistribution + object EasyPlot { - def ezplot(vec: Vector[Double], style: Char): Unit = { + def ezplot(vec: Vector[Double], style: Char): Figure = { val f = Figure() val p = f.subplot(0) p += plot((0 until vec.length).map(_.toDouble).toArray, vec, style = style) + f } - def ezplot(vec: Vector[Double]): Unit = ezplot(vec, '-') + def ezplot(vec: Vector[Double]): Figure = ezplot(vec, '-') - def ezplot(arr: Array[Double], style: Char): Unit = { + def ezplot(arr: Array[Double], style: Char): Figure = { val f = Figure() val p = f.subplot(0) - p += plot((0 until arr.length).map(_.toDouble).toArray, arr, style = style) + p += plot(arr.indices.map(_.toDouble).toArray, arr, style = style) + f } - def ezplot(arr: Array[Double]): Unit = ezplot(arr, '-') + def ezplot(arr: Array[Double]): Figure = ezplot(arr, '-') - def ezplot(vecs: Seq[Vector[Double]], style: Char): Unit = { + def ezplot(vecs: Seq[Vector[Double]], style: Char): Figure = { val f = Figure() val p = f.subplot(0) val first = vecs.head vecs.foreach { vec => p += plot((0 until first.length).map(_.toDouble).toArray, vec, style) } + f } - def ezplot(vecs: Seq[Vector[Double]]): Unit = ezplot(vecs, '-') + def ezplot(vecs: Seq[Vector[Double]]): Figure = ezplot(vecs, '-') + + /** + * Autocorrelation function plot + * @param data array of data to analyze + * @param maxLag maximum lag for autocorrelation + * @param conf confidence bounds to display + */ + def acfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Figure = { + // calculate correlations and confidence bound + val autoCorrs = UnivariateTimeSeries.autocorr(data, maxLag) + val confVal = calcConfVal(conf, data.length) + + // Basic plot information + val f = Figure() + val p = f.subplot(0) + p.title = "Autocorrelation function" + p.xlabel = "Lag" + p.ylabel = "Autocorrelation" + drawCorrPlot(autoCorrs, confVal, p) + f + } + + /** + * Partial autocorrelation function plot + * @param data array of data to analyze + * @param maxLag maximum lag for partial autocorrelation function + * @param conf confidence bounds to display + */ + def pacfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Figure = { + // create AR(maxLag) model, retrieve coefficients and calculate confidence bound + val model = Autoregression.fitModel(new DenseVector(data), maxLag) + val pCorrs = model.coefficients // partial autocorrelations are the coefficients in AR(n) model + val confVal = calcConfVal(conf, data.length) + + // Basic plot information + val f = Figure() + val p = f.subplot(0) + p.title = "Partial autocorrelation function" + p.xlabel = "Lag" + p.ylabel = "Partial Autocorrelation" + drawCorrPlot(pCorrs, confVal, p) + f + } + + private[sparkts] def calcConfVal(conf:Double, n: Int): Double = { + val stdNormDist = new NormalDistribution(0, 1) + val pVal = (1 - conf) / 2.0 + stdNormDist.inverseCumulativeProbability(1 - pVal) / Math.sqrt(n) + } + + private[sparkts] def drawCorrPlot(corrs: Array[Double], confVal: Double, p: Plot): Unit = { + // make decimal ticks visible + p.setYAxisDecimalTickUnits() + // plot correlations as vertical lines + val verticalLines = corrs.zipWithIndex.map { case (corr, ix) => + (Array(ix.toDouble + 1, ix.toDouble + 1), Array(0, corr)) + } + verticalLines.foreach { case (xs, ys) => p += plot(xs, ys) } + // plot confidence intervals as horizontal lines + val n = corrs.length + Array(confVal, -1 * confVal).foreach { conf => + val xs = (0 to n).toArray.map(_.toDouble) + val ys = Array.fill(n + 1)(conf) + p += plot(xs, ys, '-', colorcode = "red") + } + } } diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeries.scala b/src/main/scala/com/cloudera/sparkts/TimeSeries.scala index 3be205ed..981d771b 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeries.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeries.scala @@ -20,17 +20,17 @@ import breeze.linalg._ import com.github.nscala_time.time.Imports._ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], - val labels: Array[String]) extends Serializable { + val keys: Array[String]) extends Serializable { def slice(range: Range): TimeSeries = { - new TimeSeries(index.slice(range), data(range, ::), labels) + new TimeSeries(index.slice(range), data(range, ::), keys) } def union(vec: Vector[Double], key: String): TimeSeries = { val mat = DenseMatrix.zeros[Double](data.rows, data.cols + 1) (0 until data.cols).foreach(c => mat(::, c to c) := data(::, c to c)) mat(::, -1 to -1) := vec - new TimeSeries(index, mat, labels :+ key) + new TimeSeries(index, mat, keys :+ key) } /** @@ -86,7 +86,7 @@ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], def hasNext: Boolean = i < data.cols def next(): (String, Vector[Double]) = { i += 1 - (labels(i - 1), data(::, i - 1)) + (keys(i - 1), data(::, i - 1)) } } } @@ -99,15 +99,15 @@ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], } /** - * Applies a transformation to each series that preserves the time index. Passes the label along + * Applies a transformation to each series that preserves the time index. Passes the key along * with each series. */ - def mapSeriesWithLabel(f: (String, Vector[Double]) => Vector[Double]): TimeSeries = { + def mapSeriesWithKey(f: (String, Vector[Double]) => Vector[Double]): TimeSeries = { val newData = new DenseMatrix[Double](index.size, data.cols) univariateKeyAndSeriesIterator().zipWithIndex.foreach { case ((key, series), index) => newData(::, index) := f(key, series) } - new TimeSeries(index, newData, labels) + new TimeSeries(index, newData, keys) } /** @@ -120,7 +120,7 @@ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], univariateSeriesIterator().zipWithIndex.foreach { case (vec, index) => newData(::, index) := f(vec) } - new TimeSeries(newIndex, newData, labels) + new TimeSeries(newIndex, newData, keys) } def mapValues[U](f: (Vector[Double]) => U): Seq[(String, U)] = { @@ -128,13 +128,13 @@ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], } /** - * Gets the first univariate series and its label. + * Gets the first univariate series and its key. */ def head(): (String, Vector[Double]) = univariateKeyAndSeriesIterator().next } object TimeSeries { - def timeSeriesFromSamples(samples: Seq[(DateTime, Array[Double])], labels: Array[String]) + def timeSeriesFromSamples(samples: Seq[(DateTime, Array[Double])], keys: Array[String]) : TimeSeries = { val mat = new DenseMatrix[Double](samples.length, samples.head._2.length) val dts = new Array[Long](samples.length) @@ -143,7 +143,7 @@ object TimeSeries { dts(i) = dt.getMillis mat(i to i, ::) := new DenseVector[Double](values) } - new TimeSeries(new IrregularDateTimeIndex(dts), mat, labels) + new TimeSeries(new IrregularDateTimeIndex(dts), mat, keys) } } diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala index c7b46dd9..38f8200f 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala @@ -158,7 +158,7 @@ class TimeSeriesRDD(val index: DateTimeIndex, parent: RDD[(String, Vector[Double * In the returned RDD, the ordering of values within each record corresponds to the ordering of * the time series records in the original RDD. The records are ordered by time. */ - def toSamples(nPartitions: Int = -1): RDD[(DateTime, Vector[Double])] = { + def toInstants(nPartitions: Int = -1): RDD[(DateTime, Vector[Double])] = { val maxChunkSize = 20 val dividedOnMapSide = mapPartitionsWithIndex { case (partitionId, iter) => diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index 6083bfb1..0b03faa0 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -22,6 +22,8 @@ import com.cloudera.finance.Util import org.apache.commons.math3.distribution.{ChiSquaredDistribution, NormalDistribution} import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression +import scala.collection.immutable.ListMap + /** * Adapted from statsmodels: * https://github.com/statsmodels/statsmodels/blob/master/statsmodels/tsa/stattools.py @@ -248,33 +250,171 @@ object TimeSeriesStatisticalTests { } /** - * Breusch-Godfrey test for serial correlation in a model - * The statistic asymptotically follows an X^2 distribution with maxLag degrees of freedom, - * and provides a test for the null hypothesis of lack of serial correlation up to degree maxLag - * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Godfrey_test: - * Given an OLS model of the form y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t - * We estimate vector u_hat by obtaining residuals from the model fit - * We then calculate an auxiliary regression of the form: - * u_hat_t = a0 + a1 * x1_t + a2 * x2_t + ... + p1 * u_hat_t-1 + p2 * u_hat_t-2 ... - * Our test statistic is then (R^2 of the auxiliary regression) * (# of obs - maxLag) - * @return The Breusch-Godfrey statistic and p value - */ - def bgtest(ts: Vector[Double], factors: Matrix[Double], maxLag: Int): (Double, Double) = { - // original regression model - val origOLS = new OLSMultipleLinearRegression() + * Breusch-Godfrey test for serial correlation in a model + * The statistic asymptotically follows an X^2 distribution with maxLag degrees of freedom, + * and provides a test for the null hypothesis of lack of serial correlation up to degree maxLag + * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Godfrey_test: + * Given estimated residuals u_hat_t from an OLS model of the form + * y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t + * We calculate an auxiliary regression of the form: + * u_hat_t = a0 + a1 * x1_t + a2 * x2_t + ... + p1 * u_hat_t-1 + p2 * u_hat_t-2 ... + * Our test statistic is then (# of obs - maxLag) * (R^2 of the auxiliary regression) + * @return The Breusch-Godfrey statistic and p value + */ + def bgtest(residuals: Vector[Double], factors: Matrix[Double], maxLag: Int): (Double, Double) = { + val origResiduals = residuals.toArray val origFactors = Util.matToRowArrs(factors) // X (wiki) - origOLS.newSampleData(ts.toArray, origFactors) // Y = A * X + u (wiki) - val resids = origOLS.estimateResiduals() // u_hat (wiki) - // auxiliary regression model - val lagResids = Lag.lagMatTrimBoth(resids, maxLag, false) // u_hat_lagged (wiki) + // auxiliary regression model + val lagResids = Lag.lagMatTrimBoth(origResiduals, maxLag, false) // u_hat_lagged (wiki) val nObs = lagResids.length - val dropLen = ts.size - nObs // drop x # of elements to run new regression + val dropLen = residuals.size - nObs // drop x # of elements to run new regression val auxOLS = new OLSMultipleLinearRegression() // auxiliary OLS for bg test - val auxFactors = origFactors.drop(dropLen).zip(lagResids).map {case (x, u_t) => x ++ u_t } - auxOLS.newSampleData(resids.drop(dropLen), auxFactors) // u_hat= A*X + P*u_hat_lagged + e (wiki) + val auxFactors = origFactors.drop(dropLen).zip(lagResids).map { case (x, u_t) => x ++ u_t } + auxOLS.newSampleData(origResiduals.drop(dropLen), auxFactors) // u_hat= A*X + P*u_hat_lagged + e val bgstat = nObs * auxOLS.calculateRSquared() (bgstat, 1 - new ChiSquaredDistribution(maxLag).cumulativeProbability(bgstat)) } - - + + /** + * Ljung-Box test for serial correlation in residuals up to lag `maxLag`. The null hypothesis + * is that values are independently distributed up to the given lag. The alternate hypothesis + * is that serial correlation is present. The test statistic follows a Chi-Squared distribution + * with `maxLag` degrees of freedom. See [[https://en.wikipedia.org/wiki/Ljung%E2%80%93Box_test]] + * for more information. + * @param residuals + * @return the test statistic and the p-value associated with it. + */ + def lbtest(residuals: Vector[Double], maxLag: Int): (Double, Double) = { + val autoCorrs = UnivariateTimeSeries.autocorr(residuals, maxLag) + val n = residuals.length + val adjAutoCorrs = autoCorrs.toArray.zipWithIndex.map { case (p, k) => + (p * p) / (n - k - 1) + } + val testStatistic = n * (n + 2) * adjAutoCorrs.sum + val pValue = 1 - new ChiSquaredDistribution(maxLag).cumulativeProbability(testStatistic) + (testStatistic, pValue) + } + + /** + * Breusch-Pagan test for heteroskedasticity in a model + * The statistic follows a X^2 distribution with (# of regressors - 1) degrees of freedom + * and provides a test for a null hypothesis of homoscedasticity + * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Pagan_test + * Given a vector of estimated residuals (u) from an OLS model, we create an auxiliary regression + * that models the squared residuals (u^2) as a function of the original regressors (X) + * u^2 = beta * X + * We construct our test statistic as (# of observations) * R^2 of our auxiliary regression + * @return The Breusch-Pagan statistic and p value + */ + def bptest(residuals: Vector[Double], factors: Matrix[Double]): (Double, Double) = { + val residualsSquared = residuals.toArray.map(x => x * x) // u^2 + val origFactors = Util.matToRowArrs(factors) // X + val auxOLS = new OLSMultipleLinearRegression() // auxiliary OLS for bp test + auxOLS.newSampleData(residualsSquared, origFactors) // u^2 = beta * X + val bpstat = residuals.length * auxOLS.calculateRSquared() + val df = factors.cols // auxOLS uses intercept term, so (# of regressors - 1) = # factors cols + (bpstat, 1 - new ChiSquaredDistribution(df).cumulativeProbability(bpstat)) + } + + /** + * Critical values associated with KPSS test when testing with a simple constant + * Source: + * Kwiatkowski, D., Phillips, P., Schmidt, P., & Shin, Y. + * Testing the null hypothesis of stationarity against the alternative of a unit root. + * Journal of Econometrics, 159-178. + */ + private val kpssConstantCriticalValues = ListMap( + 0.10 -> 0.347, 0.05 -> 0.463, 0.025 -> 0.574, 0.01 -> 0.739 + ) + + /** + * Critical values associated with KPSS test when testing with a constant and time trend + * Source: + * Kwiatkowski, D., Phillips, P., Schmidt, P., & Shin, Y. + * Testing the null hypothesis of stationarity against the alternative of a unit root. + * Journal of Econometrics, 159-178. + */ + private val kpssConstantAndTrendCriticalValues = ListMap( + 0.10 -> 0.119, 0.05 -> 0.146, 0.025 -> 0.176, 0.01 -> 0.216 + ) + + /** + * Performs the KPSS stationarity test (Kwiatkowski, Phillips, Schmidt and Shin). The null + * hypothesis corresponds to stationarity (level stationarity if method = "c", or trend + * stationarity if method = "ct"). If method = "c", a regression of the form ts_i = alpha + + * error_i is fit. If method = "ct", a regression of the form ts_i = alpha + beta * i + error_i. + * The test then performs a check for the variance of the errors. The null hypothesis of + * stationarity corresponds to a null hypothesis of variance = 0 for the errors. + * For more information please see pg 129 of [[http://faculty.washington + * .edu/ezivot/econ584/notes/unitroot.pdf]]. For the original paper on the test, please see + * [[http://www.deu.edu.tr/userweb/onder.hanedar/dosyalar/kpss.pdf]]. + * Finally, the current implementation follows R's tseries package implementation closely, which + * can be found at [[https://cran.r-project.org/web/packages/tseries/index.html]] + * @param ts time series to test for stationarity + * @param method "c" or "ct", short for fitting with a constant or a constant and a time trend + * @return the KPSS statistic and a map of critical values according to the method selected + */ + def kpsstest(ts: Vector[Double], method: String): (Double, Map[Double, Double]) = { + require(List("c", "ct").contains(method), "trend must be c or ct") + val n = ts.length + // constant + val c = Array.fill(n)(1.0) + // create appropriate regression matrix and obtain critical values + val (regressors, criticalValues) = if (method == "c") { + (c.map(Array(_)), kpssConstantCriticalValues) + } else { + // time trend + val t = Array.tabulate(n)(x => 1.0 + x.toDouble) + (Array(c, t).transpose, kpssConstantAndTrendCriticalValues) + } + val ols = new OLSMultipleLinearRegression() + ols.setNoIntercept(true) + ols.newSampleData(ts.toArray, regressors) + val residuals = ols.estimateResiduals() + // sum of (square of cumulative sum of errors) + val s2 = residuals.scanLeft(0.0)(_ + _).tail.map(math.pow(_, 2)).sum + // long run variance estimate using Newey-West estimator + // we follow the default lag used in kpss.test in R's tseries package + val lag = (3 * math.sqrt(n) / 13).toInt + val longRunVariance = neweyWestVarianceEstimator(residuals, lag) + val stat = (s2 / longRunVariance) / (n * n) + (stat, criticalValues) + } + + /** + * Estimates long run variance using the Newey-West estimator, which consists on calculating + * the variance between lagged values and applying a weight that decreases as the lag increases. + * We translate the C implementation used by R's tseries:kpss.test, found at + * https://github.com/cran/tseries/blob/master/src/ppsum.c + * However, we add our own comments for clarity. + * See pg 87 of [[http://faculty.washington.edu/ezivot/econ584/notes/timeSeriesConcepts.pdf]] for + * more information + */ + private def neweyWestVarianceEstimator(errors: Array[Double], lag: Int): Double = { + val n = errors.length + var sumOfTerms = 0.0 + var cellContrib = 0.0 + var i = 0 + var j = 0 + + i = 1 + while (i <= lag) { + j = i + cellContrib = 0.0 + while (j < n) { + // covariance between values at time t and time t-i + cellContrib += errors(j) * errors(j - i) + j += 1 + } + // Newey-West weighing (decreasing weight for longer lags) + sumOfTerms += cellContrib * (1 - (i.toDouble / (lag + 1))) + i += 1 + } + // we multiply by 2 as we calculated the sum of top row in matrix (lag 0, lag 1), (lag 0, lag 2) + // etc but we need to also calculate first column (lag 1, lag 0), (lag 2, lag 0) etc + // we divide by n and obtain a partial estimate of the variance, just missing no lag variance + val partialEstVar = (sumOfTerms * 2) / n + // add no-lag variance + partialEstVar + (errors.map(math.pow(_, 2)).sum / n) + } } diff --git a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala index fb5fb93a..9b3024bc 100644 --- a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala +++ b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala @@ -18,6 +18,8 @@ package com.cloudera.sparkts import breeze.linalg._ import breeze.stats._ +import org.apache.commons.math3.analysis.interpolation.SplineInterpolator + object UnivariateTimeSeries { def autocorr(ts: Array[Double], numLags: Int): Array[Double] = { autocorr(new DenseVector(ts), numLags).toDenseVector.data @@ -125,6 +127,7 @@ object UnivariateTimeSeries { case "nearest" => fillNearest(ts) case "next" => fillNext(ts) case "previous" => fillPrevious(ts) + case "spline" => fillSpline(ts) case _ => throw new UnsupportedOperationException() } } @@ -168,9 +171,9 @@ object UnivariateTimeSeries { } /** - * fills in NaN with the previously available not NaN, scanning from left to right. - * 1 NaN NaN 2 Nan -> 1 1 1 2 2 - */ + * fills in NaN with the previously available not NaN, scanning from left to right. + * 1 NaN NaN 2 Nan -> 1 1 1 2 2 + */ def fillPrevious(values: Vector[Double]): DenseVector[Double] = { val result = new DenseVector(values.toArray) var filler = Double.NaN // initial value, maintains invariant @@ -188,9 +191,9 @@ object UnivariateTimeSeries { } /** - * fills in NaN with the next available not NaN, scanning from right to left. - * 1 NaN NaN 2 Nan -> 1 2 2 2 NaN - */ + * fills in NaN with the next available not NaN, scanning from right to left. + * 1 NaN NaN 2 Nan -> 1 2 2 2 NaN + */ def fillNext(values: Vector[Double]): DenseVector[Double] = { val result = new DenseVector(values.toArray) var filler = Double.NaN // initial value, maintains invariant @@ -203,14 +206,13 @@ object UnivariateTimeSeries { result } - def fillWithDefault(values: Array[Double], filler: Double): Array[Double] = { fillWithDefault(new DenseVector(values), filler).data } /** - * fills in NaN with a default value - */ + * fills in NaN with a default value + */ def fillWithDefault(values: Vector[Double], filler: Double): DenseVector[Double] = { val result = new DenseVector(values.toArray) var i = 0 @@ -246,5 +248,83 @@ object UnivariateTimeSeries { result } + def fillSpline(values: Array[Double]): Array[Double] = { + fillSpline(new DenseVector(values)).data + } + + /** + * Fill in NaN values using a natural cubic spline. + * @param values Vector to interpolate + * @return Interpolated vector + */ + def fillSpline(values: Vector[Double]): DenseVector[Double]= { + val result = new DenseVector(values.toArray) + val interp = new SplineInterpolator() + val knotsAndValues = values.toArray.zipWithIndex.filter(!_._1.isNaN) + // Note that the type of unzip is missed up in scala 10.4 as per + // https://issues.scala-lang.org/browse/SI-8081 + // given that this project is using scala 10.4, we cannot use unzip, so unpack manually + val knotsX = knotsAndValues.map(_._2.toDouble) + val knotsY = knotsAndValues.map(_._1) + val filler = interp.interpolate(knotsX, knotsY) + + // values that we can interpolate between, others need to be filled w/ other function + var i = knotsX(0).toInt + val end = knotsX.last.toInt + + while (i < end) { + result(i) = filler.value(i.toDouble) + i += 1 + } + result + } + def ar(values: Vector[Double], maxLag: Int): ARModel = Autoregression.fitModel(values, maxLag) + + /** + * Down sample by taking every nth element starting from offset phase + * @param values Vector to down sample + * @param n take every nth element + * @param phase offset from starting index + * @return downsampled vector with appropriate length + */ + def downsample(values: Vector[Double], n: Int, phase: Int = 0) = { + val origLen = values.length + val newLen = Math.ceil((values.length - phase) / n.toDouble).toInt + val sampledValues = new DenseVector(Array.fill(newLen)(0.0)) + var i = phase + var j = 0 + + while (j < newLen) { + sampledValues(j) = values(i) + i += n + j += 1 + } + sampledValues + } + + /** + * Up sample by inserting n - 1 elements into the original values vector, starting at index phase + * @param values the original data vector + * @param n the number of insertions between elements + * @param phase the offset to begin + * @param useZero fill with zeros rather than NaN + * @return upsampled vector filled with zeros or NaN, as specified by user + */ + def upsample(values: Vector[Double], n: Int, phase: Int = 0, useZero: Boolean = false) = { + val filler = if (useZero) 0 else Double.NaN + val origLen = values.length + val newLen = origLen * n + val sampledValues = new DenseVector(Array.fill(newLen)(filler)) + var i = phase + var j = 0 + + while (j < origLen) { + sampledValues(i) = values(j) + i += n + j += 1 + } + sampledValues + } } + diff --git a/src/test/scala/com/cloudera/sparkts/EWMASuite.scala b/src/test/scala/com/cloudera/sparkts/EWMASuite.scala new file mode 100644 index 00000000..2e1106b2 --- /dev/null +++ b/src/test/scala/com/cloudera/sparkts/EWMASuite.scala @@ -0,0 +1,67 @@ +/** + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.sparkts + +import breeze.linalg._ + +import org.scalatest.{FunSuite, ShouldMatchers} + +class EWMASuite extends FunSuite with ShouldMatchers { + test("adding time dependent effects") { + val orig = new DenseVector((1 to 10).toArray.map(_.toDouble)) + + val m1 = new EWMAModel(0.2) + val smoothed1 = new DenseVector(Array.fill(10)(0.0)) + m1.addTimeDependentEffects(orig, smoothed1) + + smoothed1(0) should be (orig(0)) + smoothed1(1) should be (m1.smoothing * orig(1) + (1 - m1.smoothing) * smoothed1(0)) + round2Dec(smoothed1.toArray.last) should be (6.54) + + val m2 = new EWMAModel(0.6) + val smoothed2 = new DenseVector(Array.fill(10)(0.0)) + m2.addTimeDependentEffects(orig, smoothed2) + + smoothed2(0) should be (orig(0)) + smoothed2(1) should be (m2.smoothing * orig(1) + (1 - m2.smoothing) * smoothed2(0)) + round2Dec(smoothed2.toArray.last) should be (9.33) + } + + test("removing time dependent effects") { + val smoothed = new DenseVector(Array(1.0, 1.2, 1.56, 2.05, 2.64, 3.31, 4.05, 4.84, 5.67, 6.54)) + + val m1 = new EWMAModel(0.2) + val orig1 = new DenseVector(Array.fill(10)(0.0)) + m1.removeTimeDependentEffects(smoothed, orig1) + + round2Dec(orig1(0)) should be (1.0) + orig1.toArray.last.toInt should be(10) + } + + test("fitting EWMA model") { + // We reproduce the example in ch 7.1 from + // https://www.otexts.org/fpp/7/1 + val oil = Array(446.7, 454.5, 455.7, 423.6, 456.3, 440.6, 425.3, 485.1, 506.0, 526.8, + 514.3, 494.2) + val model = EWMA.fitModel(new DenseVector(oil)) + val truncatedSmoothing = (model.smoothing * 100.0).toInt + truncatedSmoothing should be (89) // approximately 0.89 + } + + private def round2Dec(x: Double): Double = { + (x * 100).round / 100.00 + } +} diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala index f008edcb..7b78d922 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala @@ -60,7 +60,7 @@ class TimeSeriesRDDSuite extends FunSuite with LocalSparkContext with ShouldMatc rdd.filterEndingAfter(start).count() should be (3) } - test("toSamples") { + test("toInstants") { val conf = new SparkConf().setMaster("local").setAppName(getClass.getName) TimeSeriesKryoRegistrator.registerKryoClasses(conf) sc = new SparkContext(conf) @@ -71,7 +71,7 @@ class TimeSeriesRDDSuite extends FunSuite with LocalSparkContext with ShouldMatc val index = uniform(start, 4, 1.days) val rdd = sc.parallelize(labels.zip(seriesVecs.map(_.asInstanceOf[Vector[Double]])), 3) val tsRdd = new TimeSeriesRDD(index, rdd) - val samples = tsRdd.toSamples().collect() + val samples = tsRdd.toInstants().collect() samples should be (Array( (start, new DenseVector((0.0 until 20.0 by 4.0).toArray)), (start + 1.days, new DenseVector((1.0 until 20.0 by 4.0).toArray)), diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala index ae095d5b..72063d4c 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala @@ -3,6 +3,7 @@ * * Cloudera, Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"). You may not use this file except in + * Version 2.0 (the "License"). You may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -15,34 +16,132 @@ package com.cloudera.sparkts -import scala.Double.NaN - import breeze.linalg._ import com.cloudera.sparkts.TimeSeriesStatisticalTests._ +import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression import org.apache.commons.math3.random.MersenneTwister import org.scalatest.{FunSuite, ShouldMatchers} class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { test("breusch-godfrey") { - // Replicating the example provided by R package lmtest for bgtest - val rand = new MersenneTwister(5L) + // Replicating the example provided by R package lmtest for bgtest + val rand = new MersenneTwister(5L) val n = 100 val coef = 0.5 // coefficient for lagged series val x = Array.fill(n / 2)(Array(1.0, -1.0)).flatten - val y1 = x.map(_ + 1 + rand.nextDouble) // stationary - // AR(1), recursive filter with 0.5 coef - val y2 = y1.scanLeft(0.0) { case (prior, curr) => prior * coef + curr }.tail + // stationary series + val y1 = x.map(_ + 1 + rand.nextGaussian()) + // AR(1) series, recursive filter with coef coefficient + val y2 = y1.scanLeft(0.0) { case (prior, curr) => prior * coef + curr }.tail + val pthreshold = 0.05 - + + val OLS1 = new OLSMultipleLinearRegression() + OLS1.newSampleData(y1, x.map(Array(_))) + val resids1 = OLS1.estimateResiduals() + + val OLS2 = new OLSMultipleLinearRegression() + OLS2.newSampleData(y2, x.map(Array(_))) + val resids2 = OLS2.estimateResiduals() + // there should be no evidence of serial correlation - bgtest(new DenseVector(y1), new DenseMatrix(x.length, 1, x), 1)._2 should be > pthreshold - bgtest(new DenseVector(y1), new DenseMatrix(x.length, 1, x), 4)._2 should be > pthreshold + bgtest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x), 1)._2 should be > pthreshold + bgtest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x), 4)._2 should be > pthreshold // there should be evidence of serial correlation - bgtest(new DenseVector(y2), new DenseMatrix(x.length, 1, x), 1)._2 should be < pthreshold - bgtest(new DenseVector(y2), new DenseMatrix(x.length, 1, x), 4)._2 should be < pthreshold + bgtest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x), 1)._2 should be < pthreshold + bgtest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x), 4)._2 should be < pthreshold + } + + test("breusch-pagan") { + // Replicating the example provided by R package lmtest for bptest + val rand = new MersenneTwister(5L) + val n = 100 + val x = Array.fill(n / 2)(Array(-1.0, 1.0)).flatten + + // homoskedastic residuals with variance 1 throughout + val err1 = Array.fill(n)(rand.nextGaussian) + // heteroskedastic residuals with alternating variance of 1 and 4 + val varFactor = 2 + val err2 = err1.zipWithIndex.map { case (x, i) => if(i % 2 == 0) x * varFactor else x } + + // generate dependent variables + val y1 = x.zip(err1).map { case (xi, ei) => xi + ei + 1 } + val y2 = x.zip(err2).map { case (xi, ei) => xi + ei + 1 } + + // create models and calculate residuals + val OLS1 = new OLSMultipleLinearRegression() + OLS1.newSampleData(y1, x.map(Array(_))) + val resids1 = OLS1.estimateResiduals() + + val OLS2 = new OLSMultipleLinearRegression() + OLS2.newSampleData(y2, x.map(Array(_))) + val resids2 = OLS2.estimateResiduals() + + val pthreshold = 0.05 + // there should be no evidence of heteroskedasticity + bptest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x))._2 should be > pthreshold + // there should be evidence of heteroskedasticity + bptest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x))._2 should be < pthreshold } + test("ljung-box test") { + val rand = new MersenneTwister(5L) + val n = 100 + val indep = Array.fill(n)(rand.nextGaussian) + val vecIndep = new DenseVector(indep) + val (stat1, pval1) = lbtest(vecIndep, 1) + pval1 > 0.05 + + // serially correlated + val coef = 0.3 + val dep = indep.scanLeft(0.0) { case (prior, curr) => prior * coef + curr }.tail + val vecDep = new DenseVector(dep) + val (stat2, pval2) = lbtest(vecDep, 2) + pval2 < 0.05 + } + + test("KPSS test: R equivalence") { + // Note that we only test the statistic, as in contrast with the R tseries implementation + // we do not calculate a p-value, but rather return a map of appropriate critical values + // R's tseries linearly interpolates between critical values. + + // version.string R version 3.2.0 (2015-04-16) + // > set.seed(10) + // > library(tseries) + // > v <- rnorm(20) + // > kpss.test(v, "Level") + // + // KPSS Test for Level Stationarity + // + // data: v + // KPSS Level = 0.27596, Truncation lag parameter = 1, p-value = 0.1 + // + // Warning message: + // In kpss.test(v, "Level") : p-value greater than printed p-value + // > kpss.test(v, "Trend") + // + // KPSS Test for Trend Stationarity + // + // data: v + // KPSS Trend = 0.05092, Truncation lag parameter = 1, p-value = 0.1 + // + // Warning message: + // In kpss.test(v, "Trend") : p-value greater than printed p-value + + val arr = Array(0.0187461709418264, -0.184252542069064, -1.37133054992251, -0.599167715783718, + 0.294545126567508, 0.389794300700167, -1.20807617542949, -0.363676017470862, + -1.62667268170309, -0.256478394123992, 1.10177950308713, 0.755781508027337, + -0.238233556018718, 0.98744470341339, 0.741390128383824, 0.0893472664958216, + -0.954943856152377, -0.195150384667239, 0.92552126209408, 0.482978524836611 + ) + val dv = new DenseVector(arr) + val cTest = kpsstest(dv, "c")._1 + val ctTest = kpsstest(dv, "ct")._1 + + cTest should be (0.2759 +- 1e-4) + ctTest should be (0.05092 +- 1e-4) + } } diff --git a/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala b/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala index e486d9f4..29cd26cc 100644 --- a/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala @@ -46,4 +46,55 @@ class UnivariateTimeSeriesSuite extends FunSuite with ShouldMatchers { arAutocorr(1) should be > 0.0 arAutocorr(2) should be > 0.0 } + + test("upsampling") { + // replicating upsampling examples + // from http://www.mathworks.com/help/signal/ref/upsample.html?searchHighlight=upsample + val y = new DenseVector(Array(1.0, 2.0, 3.0, 4.0)) + val yUp1 = upsample(y, 3, useZero = true).toArray + yUp1 should be (Array(1.0, 0.0, 0.0, 2.0, 0.0, 0.0, 3.0, 0.0, 0.0, 4.0, 0.0, 0.0)) + + val yUp2 = upsample(y, 3, useZero = true, phase = 2).toArray + yUp2 should be (Array(0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 0.0, 3.0, 0.0, 0.0, 4.0)) + } + + test("downsampling") { + // replicating downsampling examples + // from http://www.mathworks.com/help/signal/ref/downsample.html?searchHighlight=downsample + val y = new DenseVector((1 to 10).toArray.map(_.toDouble)) + val yDown1 = downsample(y, 3).toArray + yDown1 should be (Array(1.0, 4.0, 7.0, 10.0)) + + val yDown2 = downsample(y, 3, phase = 2).toArray + yDown2 should be (Array(3.0, 6.0, 9.0)) + + } + + test("signal reconstruction with spline") { + // If we have a frequent signal, downsample it (at a rate that doesn't cause aliasing) + // and we upsample, and apply a filter (interpolation), then the result should be fairly + // close to the original signal. In our case, we drop NAs that are not filled by interpolation + // (i.e no extrapolation) + + val y = (1 to 1000).toArray.map(_.toDouble / 100.0).map(Math.sin(_)) + val vy = new DenseVector(y) + val lessFreq = downsample(vy, 100) + val moreFreq = upsample(lessFreq, 100) + + // work on copies + val splineY = fillSpline(new DenseVector(moreFreq.toArray)).toArray + val lineY = fillLinear(new DenseVector(moreFreq.toArray)).toArray + + val MSE = (est: Array[Double], obs: Array[Double]) => { + val errs = est.zip(obs).filter(!_._1.isNaN).map { case (yhat, y) => (yhat - y) * (yhat - y) } + errs.sum / errs.length + } + + val sE = MSE(splineY, y) + val lE = MSE(lineY, y) + + // a cubic spline should be better than linear interpolation + sE should be < lE + } } + diff --git a/src/test/scala/com/cloudera/sparkts/YarnAllocatorSuite.scala b/src/test/scala/com/cloudera/sparkts/YarnAllocatorSuite.scala new file mode 100644 index 00000000..fbb8859a --- /dev/null +++ b/src/test/scala/com/cloudera/sparkts/YarnAllocatorSuite.scala @@ -0,0 +1,86 @@ +package com.cloudera.sparkts + +import org.scalatest.FunSuite +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.util.Random +import org.apache.hadoop.yarn.api.records.{Priority, Resource} + +class YarnAllocatorSuite extends FunSuite { + test("benchmark") { + val map = new HashMap[String, Int] + val numNodes = 1000 + val numTasks = 10000 + val rand = new Random(100) + var i = 0 + while (i < numTasks) { + val taskLocations = (0 until 3).map(x => rand.nextInt(numNodes).toString) + taskLocations.foreach { taskLoc => + if (map.contains(taskLoc)) { + map(taskLoc) += 1 + } else { + map(taskLoc) = 1 + } + } + i += 1 + } + println("done generating tasks") + val amClient = AMRMClient.createAMRMClient[ContainerRequest]() + var existing = new ArrayBuffer[ContainerRequest](0) + println("done creating amrmclient") + existing = regenerateContainerRequests(amClient, existing, map) + val warmupRounds = 1000 + val warmupStart = System.currentTimeMillis + i = 0 + while (i < warmupRounds) { + existing = regenerateContainerRequests(amClient, existing, map) + i += 1 + } + val warmupTime = System.currentTimeMillis - warmupStart + println(s"warmup time: $warmupTime") + val actualRounds = 1000 + val actualStart = System.currentTimeMillis + i = 0 + while (i < actualRounds) { + existing = regenerateContainerRequests(amClient, existing, map) + i += 1 + } + val actualTime = System.currentTimeMillis - actualStart + println(s"actual time: $actualTime") + } + + val _resource = Resource.newInstance(1024, 1) + val _priority = Priority.newInstance(1) + + def regenerateContainerRequests( + amClient: AMRMClient[ContainerRequest], + existingRequests: Seq[ContainerRequest], + prefs: HashMap[String, Int]): ArrayBuffer[ContainerRequest] = { + var i = 0 + while (i < existingRequests.length) { + amClient.removeContainerRequest(existingRequests(i)) + i += 1 + } + val prefsClone = new HashMap[String, Int] + prefsClone ++= prefs + val newRequests = new ArrayBuffer[ContainerRequest] + while (!prefsClone.isEmpty) { + val locs = prefsClone.keySet.toArray + var j = 0 + while (j < locs.length) { + val num = prefsClone(locs(j)) + if (num == 1) { + prefsClone.remove(locs(j)) + } else { + prefsClone(locs(j)) = num - 1 + } + j += 1 + } + val request = new ContainerRequest(_resource, null, locs, _priority) + newRequests += request + amClient.addContainerRequest(request) + } + newRequests + } +}