From 6c2e2a6645ac01cff7fbcce57b1a4645ba83b8df Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Fri, 12 Jun 2015 11:40:01 -0700 Subject: [PATCH 01/42] reworked bgtest to take residuals, added bptest --- .../sparkts/TimeSeriesStatisticalTests.scala | 40 ++++++++---- .../TimeSeriesStatisticalTestsSuite.scala | 63 ++++++++++++++++--- 2 files changed, 83 insertions(+), 20 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index 6083bfb1..fd1d9d9d 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -252,29 +252,45 @@ object TimeSeriesStatisticalTests { * The statistic asymptotically follows an X^2 distribution with maxLag degrees of freedom, * and provides a test for the null hypothesis of lack of serial correlation up to degree maxLag * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Godfrey_test: - * Given an OLS model of the form y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t - * We estimate vector u_hat by obtaining residuals from the model fit - * We then calculate an auxiliary regression of the form: + * Given residuals from an OLS model of the form y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t + * We calculate an auxiliary regression of the form: * u_hat_t = a0 + a1 * x1_t + a2 * x2_t + ... + p1 * u_hat_t-1 + p2 * u_hat_t-2 ... - * Our test statistic is then (R^2 of the auxiliary regression) * (# of obs - maxLag) + * Our test statistic is then (# of obs - maxLag) * (R^2 of the auxiliary regression) * @return The Breusch-Godfrey statistic and p value */ - def bgtest(ts: Vector[Double], factors: Matrix[Double], maxLag: Int): (Double, Double) = { + def bgtest(residuals: Vector[Double], factors: Matrix[Double], maxLag: Int): (Double, Double) = { // original regression model - val origOLS = new OLSMultipleLinearRegression() + val origResiduals = residuals.toArray val origFactors = Util.matToRowArrs(factors) // X (wiki) - origOLS.newSampleData(ts.toArray, origFactors) // Y = A * X + u (wiki) - val resids = origOLS.estimateResiduals() // u_hat (wiki) // auxiliary regression model - val lagResids = Lag.lagMatTrimBoth(resids, maxLag, false) // u_hat_lagged (wiki) + val lagResids = Lag.lagMatTrimBoth(origResiduals, maxLag, false) // u_hat_lagged (wiki) val nObs = lagResids.length - val dropLen = ts.size - nObs // drop x # of elements to run new regression + val dropLen = residuals.size - nObs // drop x # of elements to run new regression val auxOLS = new OLSMultipleLinearRegression() // auxiliary OLS for bg test val auxFactors = origFactors.drop(dropLen).zip(lagResids).map {case (x, u_t) => x ++ u_t } - auxOLS.newSampleData(resids.drop(dropLen), auxFactors) // u_hat= A*X + P*u_hat_lagged + e (wiki) + auxOLS.newSampleData(origResiduals.drop(dropLen), auxFactors) // u_hat= A*X + P*u_hat_lagged + e val bgstat = nObs * auxOLS.calculateRSquared() (bgstat, 1 - new ChiSquaredDistribution(maxLag).cumulativeProbability(bgstat)) } - + /** + * Breusch-Pagan test for heteroskedasticity in a model + * The statistic follows a X^2 distribution with (# of regressors - 1) degrees of freedom + * and provides a test for a null hypothesis of homoscedasticity + * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Pagan_test + * Given a vector of estimated residuals (u) from an OLS model, we create a an auxiliary regression + * that models the squared residuals (u^2) as a function of the original regressors (X) + * u^2 = beta * X + * We construct our test statistic as (# of observations) * R^2 of our auxiliary regression + * @return The Breusch-Pagan statistic and p value + */ + def bptest(residuals: Vector[Double], factors: Matrix[Double]): (Double, Double) = { + val residualsSquared = residuals.toArray.map(x => x * x) // u^2 + val origFactors = Util.matToRowArrs(factors) // X + val auxOLS = new OLSMultipleLinearRegression() // auxiliary OLS for bp test + auxOLS.newSampleData(residualsSquared, origFactors) // u^2 = beta * X + val bpstat = residuals.length * auxOLS.calculateRSquared() + val df = factors.cols // auxOLS uses intercept term, so (# of regressors - 1) = # factors cols + (bpstat, 1 - new ChiSquaredDistribution(df).cumulativeProbability(bpstat)) + } } diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala index ae095d5b..abf4f2c7 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala @@ -3,6 +3,7 @@ * * Cloudera, Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"). You may not use this file except in + * Version 2.0 (the "License"). You may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -15,6 +16,8 @@ package com.cloudera.sparkts +import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression + import scala.Double.NaN import breeze.linalg._ @@ -31,18 +34,62 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { val rand = new MersenneTwister(5L) val n = 100 val coef = 0.5 // coefficient for lagged series + val x = Array.fill(n / 2)(Array(1.0, -1.0)).flatten - val y1 = x.map(_ + 1 + rand.nextDouble) // stationary - // AR(1), recursive filter with 0.5 coef - val y2 = y1.scanLeft(0.0) { case (prior, curr) => prior * coef + curr }.tail + // stationary series + val y1 = x.map(_ + 1 + rand.nextGaussian()) + // AR(1) series, recursive filter with coef coefficient + val y2 = y1.scanLeft(0.0) { case (prior, curr) => prior * coef + curr }.tail + val pthreshold = 0.05 - + + val OLS1 = new OLSMultipleLinearRegression() + OLS1.newSampleData(y1, x.map(Array(_))) + val resids1 = OLS1.estimateResiduals() + + val OLS2 = new OLSMultipleLinearRegression() + OLS2.newSampleData(y2, x.map(Array(_))) + val resids2 = OLS2.estimateResiduals() + // there should be no evidence of serial correlation - bgtest(new DenseVector(y1), new DenseMatrix(x.length, 1, x), 1)._2 should be > pthreshold - bgtest(new DenseVector(y1), new DenseMatrix(x.length, 1, x), 4)._2 should be > pthreshold + bgtest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x), 1)._2 should be > pthreshold + bgtest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x), 4)._2 should be > pthreshold // there should be evidence of serial correlation - bgtest(new DenseVector(y2), new DenseMatrix(x.length, 1, x), 1)._2 should be < pthreshold - bgtest(new DenseVector(y2), new DenseMatrix(x.length, 1, x), 4)._2 should be < pthreshold + bgtest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x), 1)._2 should be < pthreshold + bgtest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x), 4)._2 should be < pthreshold } + + test("breusch-pagan") { + // Replicating the example provided by R package lmtest for bptest + val rand = new MersenneTwister(5L) + val n = 100 + val x = Array.fill(n / 2)(Array(-1.0, 1.0)).flatten + + // homoscedastic residuals with variance 1 throughout + val err1 = Array.fill(n)(rand.nextGaussian) + // heteroskedastic residuals with alternating variance of 1 and 4 + val varFactor = 2 + val err2 = err1.zipWithIndex.map { case (x, i) => if(i % 2 == 0) x * varFactor else x } + + // generate dependent variables + val y1 = x.zip(err1).map { case (xi, ei) => xi + ei + 1 } + val y2 = x.zip(err2).map { case (xi, ei) => xi + ei + 1 } + + // create models and calculate residuals + val OLS1 = new OLSMultipleLinearRegression() + OLS1.newSampleData(y1, x.map(Array(_))) + val resids1 = OLS1.estimateResiduals() + + val OLS2 = new OLSMultipleLinearRegression() + OLS2.newSampleData(y2, x.map(Array(_))) + val resids2 = OLS2.estimateResiduals() + + val pthreshold = 0.05 + // there should be no evidence of heteroskedasticity + bptest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x))._2 should be > pthreshold + // there should be evidence of heteroskedasticity + bptest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x))._2 should be < pthreshold + + } } From 75e993292899b0fafec9285a13052c7934c645ba Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Fri, 12 Jun 2015 15:04:03 -0700 Subject: [PATCH 02/42] cleaned up after adding bptest/bgtest --- .../cloudera/sparkts/TimeSeriesStatisticalTests.scala | 9 +++++---- .../sparkts/TimeSeriesStatisticalTestsSuite.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index fd1d9d9d..cff41d63 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -252,7 +252,8 @@ object TimeSeriesStatisticalTests { * The statistic asymptotically follows an X^2 distribution with maxLag degrees of freedom, * and provides a test for the null hypothesis of lack of serial correlation up to degree maxLag * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Godfrey_test: - * Given residuals from an OLS model of the form y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t + * Given estimated residuals u_hat_t from an OLS model of the form + * y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t * We calculate an auxiliary regression of the form: * u_hat_t = a0 + a1 * x1_t + a2 * x2_t + ... + p1 * u_hat_t-1 + p2 * u_hat_t-2 ... * Our test statistic is then (# of obs - maxLag) * (R^2 of the auxiliary regression) @@ -262,12 +263,12 @@ object TimeSeriesStatisticalTests { // original regression model val origResiduals = residuals.toArray val origFactors = Util.matToRowArrs(factors) // X (wiki) - // auxiliary regression model + // auxiliary regression model val lagResids = Lag.lagMatTrimBoth(origResiduals, maxLag, false) // u_hat_lagged (wiki) val nObs = lagResids.length val dropLen = residuals.size - nObs // drop x # of elements to run new regression val auxOLS = new OLSMultipleLinearRegression() // auxiliary OLS for bg test - val auxFactors = origFactors.drop(dropLen).zip(lagResids).map {case (x, u_t) => x ++ u_t } + val auxFactors = origFactors.drop(dropLen).zip(lagResids).map { case (x, u_t) => x ++ u_t } auxOLS.newSampleData(origResiduals.drop(dropLen), auxFactors) // u_hat= A*X + P*u_hat_lagged + e val bgstat = nObs * auxOLS.calculateRSquared() (bgstat, 1 - new ChiSquaredDistribution(maxLag).cumulativeProbability(bgstat)) @@ -278,7 +279,7 @@ object TimeSeriesStatisticalTests { * The statistic follows a X^2 distribution with (# of regressors - 1) degrees of freedom * and provides a test for a null hypothesis of homoscedasticity * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Pagan_test - * Given a vector of estimated residuals (u) from an OLS model, we create a an auxiliary regression + * Given a vector of estimated residuals (u) from an OLS model, we create an auxiliary regression * that models the squared residuals (u^2) as a function of the original regressors (X) * u^2 = beta * X * We construct our test statistic as (# of observations) * R^2 of our auxiliary regression diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala index abf4f2c7..4363c231 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala @@ -68,7 +68,7 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { // homoscedastic residuals with variance 1 throughout val err1 = Array.fill(n)(rand.nextGaussian) // heteroskedastic residuals with alternating variance of 1 and 4 - val varFactor = 2 + val varFactor = 4 val err2 = err1.zipWithIndex.map { case (x, i) => if(i % 2 == 0) x * varFactor else x } // generate dependent variables From 7fdbafa169006efdf1800b1ee19ce32fd5f23eb9 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Mon, 15 Jun 2015 15:26:04 -0700 Subject: [PATCH 03/42] cleaned up additional style issues --- .../sparkts/TimeSeriesStatisticalTests.scala | 43 +++++++++---------- .../sparkts/UnivariateTimeSeries.scala | 19 ++++---- .../TimeSeriesStatisticalTestsSuite.scala | 24 ++++------- 3 files changed, 39 insertions(+), 47 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index cff41d63..c22f7938 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -248,19 +248,18 @@ object TimeSeriesStatisticalTests { } /** - * Breusch-Godfrey test for serial correlation in a model - * The statistic asymptotically follows an X^2 distribution with maxLag degrees of freedom, - * and provides a test for the null hypothesis of lack of serial correlation up to degree maxLag - * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Godfrey_test: - * Given estimated residuals u_hat_t from an OLS model of the form - * y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t - * We calculate an auxiliary regression of the form: - * u_hat_t = a0 + a1 * x1_t + a2 * x2_t + ... + p1 * u_hat_t-1 + p2 * u_hat_t-2 ... - * Our test statistic is then (# of obs - maxLag) * (R^2 of the auxiliary regression) - * @return The Breusch-Godfrey statistic and p value - */ + * Breusch-Godfrey test for serial correlation in a model + * The statistic asymptotically follows an X^2 distribution with maxLag degrees of freedom, + * and provides a test for the null hypothesis of lack of serial correlation up to degree maxLag + * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Godfrey_test: + * Given estimated residuals u_hat_t from an OLS model of the form + * y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t + * We calculate an auxiliary regression of the form: + * u_hat_t = a0 + a1 * x1_t + a2 * x2_t + ... + p1 * u_hat_t-1 + p2 * u_hat_t-2 ... + * Our test statistic is then (# of obs - maxLag) * (R^2 of the auxiliary regression) + * @return The Breusch-Godfrey statistic and p value + */ def bgtest(residuals: Vector[Double], factors: Matrix[Double], maxLag: Int): (Double, Double) = { - // original regression model val origResiduals = residuals.toArray val origFactors = Util.matToRowArrs(factors) // X (wiki) // auxiliary regression model @@ -275,16 +274,16 @@ object TimeSeriesStatisticalTests { } /** - * Breusch-Pagan test for heteroskedasticity in a model - * The statistic follows a X^2 distribution with (# of regressors - 1) degrees of freedom - * and provides a test for a null hypothesis of homoscedasticity - * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Pagan_test - * Given a vector of estimated residuals (u) from an OLS model, we create an auxiliary regression - * that models the squared residuals (u^2) as a function of the original regressors (X) - * u^2 = beta * X - * We construct our test statistic as (# of observations) * R^2 of our auxiliary regression - * @return The Breusch-Pagan statistic and p value - */ + * Breusch-Pagan test for heteroskedasticity in a model + * The statistic follows a X^2 distribution with (# of regressors - 1) degrees of freedom + * and provides a test for a null hypothesis of homoscedasticity + * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Pagan_test + * Given a vector of estimated residuals (u) from an OLS model, we create an auxiliary regression + * that models the squared residuals (u^2) as a function of the original regressors (X) + * u^2 = beta * X + * We construct our test statistic as (# of observations) * R^2 of our auxiliary regression + * @return The Breusch-Pagan statistic and p value + */ def bptest(residuals: Vector[Double], factors: Matrix[Double]): (Double, Double) = { val residualsSquared = residuals.toArray.map(x => x * x) // u^2 val origFactors = Util.matToRowArrs(factors) // X diff --git a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala index fb5fb93a..9466f668 100644 --- a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala +++ b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala @@ -168,9 +168,9 @@ object UnivariateTimeSeries { } /** - * fills in NaN with the previously available not NaN, scanning from left to right. - * 1 NaN NaN 2 Nan -> 1 1 1 2 2 - */ + * fills in NaN with the previously available not NaN, scanning from left to right. + * 1 NaN NaN 2 Nan -> 1 1 1 2 2 + */ def fillPrevious(values: Vector[Double]): DenseVector[Double] = { val result = new DenseVector(values.toArray) var filler = Double.NaN // initial value, maintains invariant @@ -188,9 +188,9 @@ object UnivariateTimeSeries { } /** - * fills in NaN with the next available not NaN, scanning from right to left. - * 1 NaN NaN 2 Nan -> 1 2 2 2 NaN - */ + * fills in NaN with the next available not NaN, scanning from right to left. + * 1 NaN NaN 2 Nan -> 1 2 2 2 NaN + */ def fillNext(values: Vector[Double]): DenseVector[Double] = { val result = new DenseVector(values.toArray) var filler = Double.NaN // initial value, maintains invariant @@ -203,14 +203,13 @@ object UnivariateTimeSeries { result } - def fillWithDefault(values: Array[Double], filler: Double): Array[Double] = { fillWithDefault(new DenseVector(values), filler).data } /** - * fills in NaN with a default value - */ + * fills in NaN with a default value + */ def fillWithDefault(values: Vector[Double], filler: Double): DenseVector[Double] = { val result = new DenseVector(values.toArray) var i = 0 @@ -247,4 +246,4 @@ object UnivariateTimeSeries { } def ar(values: Vector[Double], maxLag: Int): ARModel = Autoregression.fitModel(values, maxLag) -} +} \ No newline at end of file diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala index 4363c231..aea1a84f 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala @@ -16,25 +16,21 @@ package com.cloudera.sparkts -import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression - -import scala.Double.NaN - import breeze.linalg._ import com.cloudera.sparkts.TimeSeriesStatisticalTests._ +import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression import org.apache.commons.math3.random.MersenneTwister import org.scalatest.{FunSuite, ShouldMatchers} class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { test("breusch-godfrey") { - // Replicating the example provided by R package lmtest for bgtest - val rand = new MersenneTwister(5L) + // Replicating the example provided by R package lmtest for bgtest + val rand = new MersenneTwister(5L) val n = 100 val coef = 0.5 // coefficient for lagged series - val x = Array.fill(n / 2)(Array(1.0, -1.0)).flatten // stationary series val y1 = x.map(_ + 1 + rand.nextGaussian()) @@ -64,17 +60,17 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { val rand = new MersenneTwister(5L) val n = 100 val x = Array.fill(n / 2)(Array(-1.0, 1.0)).flatten - - // homoscedastic residuals with variance 1 throughout + + // homoskedastic residuals with variance 1 throughout val err1 = Array.fill(n)(rand.nextGaussian) // heteroskedastic residuals with alternating variance of 1 and 4 - val varFactor = 4 + val varFactor = 2 val err2 = err1.zipWithIndex.map { case (x, i) => if(i % 2 == 0) x * varFactor else x } - + // generate dependent variables val y1 = x.zip(err1).map { case (xi, ei) => xi + ei + 1 } val y2 = x.zip(err2).map { case (xi, ei) => xi + ei + 1 } - + // create models and calculate residuals val OLS1 = new OLSMultipleLinearRegression() OLS1.newSampleData(y1, x.map(Array(_))) @@ -83,13 +79,11 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { val OLS2 = new OLSMultipleLinearRegression() OLS2.newSampleData(y2, x.map(Array(_))) val resids2 = OLS2.estimateResiduals() - + val pthreshold = 0.05 // there should be no evidence of heteroskedasticity bptest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x))._2 should be > pthreshold // there should be evidence of heteroskedasticity bptest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x))._2 should be < pthreshold - } - } From e7ed3ae8e678ec4a8a887b047f4b07dff62638f5 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Mon, 15 Jun 2015 17:35:28 -0700 Subject: [PATCH 04/42] added EWMA functionality and tests --- .../scala/com/cloudera/sparkts/EWMA.scala | 88 +++++++++++++++++++ .../com/cloudera/sparkts/EWMASuite.scala | 68 ++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 src/main/scala/com/cloudera/sparkts/EWMA.scala create mode 100644 src/test/scala/com/cloudera/sparkts/EWMASuite.scala diff --git a/src/main/scala/com/cloudera/sparkts/EWMA.scala b/src/main/scala/com/cloudera/sparkts/EWMA.scala new file mode 100644 index 00000000..09e7dfad --- /dev/null +++ b/src/main/scala/com/cloudera/sparkts/EWMA.scala @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.sparkts + +import breeze.linalg._ + +import org.apache.commons.math3.analysis.UnivariateFunction +import org.apache.commons.math3.optim.MaxEval +import org.apache.commons.math3.optim.nonlinear.scalar.GoalType +import org.apache.commons.math3.optim.univariate.{BrentOptimizer, UnivariateObjectiveFunction, SearchInterval} + +object EWMA { + /** + * Fits an EWMA model to a time series. Uses the first point in the time series as a starting + * value. Uses sum squared error as an objective function to optimize to find smoothing parameter + * The model for EWMA is recursively defined as S_t = (1 - a) * X_t + a * S_{t-1}, where + * a is the smoothing parameter, X is the original series, and S is the smoothed series + * https://en.wikipedia.org/wiki/Exponential_smoothing + * @param ts the time series to which we want to fit a EWMA model + * @return EWMAmodel + */ + def fitModel(ts: Vector[Double]): EWMAModel = { + val objectiveFunction = new UnivariateObjectiveFunction(new UnivariateFunction() { + def value(param: Double): Double = { + new EWMAModel(param).sse(ts) + } + }) + + val maxEval = new MaxEval(10000) + val paramBounds = new SearchInterval(0.0, 1.0) + val optimizer = new BrentOptimizer(1e-6, 1e-6) + val optimal = optimizer.optimize(maxEval, objectiveFunction, GoalType.MINIMIZE, paramBounds) + val param = optimal.getPoint + new EWMAModel(param) + } +} + +class EWMAModel(val smoothing: Double) extends TimeSeriesModel { + + /** + * Calculates the SSE for a given timeseries ts given the smoothing parameter of the current model + * The forecast for the observation at period t + 1 is the smoothed value at time t + * Source: http://people.duke.edu/~rnau/411avg.htm + * @param ts the time series to fit a EWMA model to + * @return Sum Squared Error + */ + private[sparkts] def sse(ts: Vector[Double]): Double = { + val n = ts.length + val smoothed = new DenseVector(Array.fill(n)(0.0)) + addTimeDependentEffects(ts, smoothed) + val tsArr = ts.toArray + val smoothedArr = smoothed.toArray + val sqrErrors = smoothedArr.zip(tsArr.tail).map { case (yhat, y) => (yhat - y) * (yhat - y) } + sum(sqrErrors) + } + + override def removeTimeDependentEffects(ts: Vector[Double], dest: Vector[Double] = null) + : Vector[Double] = { + dest(0) = ts(0) // by definition in our model S_0 = X_0 + + for (i <- 1 until ts.length) { + dest(i) = (ts(i) - (1 - smoothing) * ts(i - 1)) / smoothing + } + dest + } + + override def addTimeDependentEffects(ts: Vector[Double], dest: Vector[Double]): Vector[Double] = { + dest(0) = ts(0) // by definition in our model S_0 = X_0 + + for (i <- 1 until ts.length) { + dest(i) = smoothing * ts(i) + (1 - smoothing) * dest(i - 1) + } + dest + } +} diff --git a/src/test/scala/com/cloudera/sparkts/EWMASuite.scala b/src/test/scala/com/cloudera/sparkts/EWMASuite.scala new file mode 100644 index 00000000..f0a39379 --- /dev/null +++ b/src/test/scala/com/cloudera/sparkts/EWMASuite.scala @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.sparkts + +import breeze.linalg._ + +import org.scalatest.{FunSuite, ShouldMatchers} + + +class EWMASuite extends FunSuite with ShouldMatchers { + test("adding time dependent effects") { + val orig = new DenseVector((1 to 10).toArray.map(_.toDouble)) + + val m1 = new EWMAModel(0.2) + val smoothed1 = new DenseVector(Array.fill(10)(0.0)) + m1.addTimeDependentEffects(orig, smoothed1) + + smoothed1(0) should be (orig(0)) + smoothed1(1) should be (m1.smoothing * orig(1) + (1 - m1.smoothing) * smoothed1(0)) + round2Dec(smoothed1.toArray.last) should be (6.54) + + val m2 = new EWMAModel(0.6) + val smoothed2 = new DenseVector(Array.fill(10)(0.0)) + m2.addTimeDependentEffects(orig, smoothed2) + + smoothed2(0) should be (orig(0)) + smoothed2(1) should be (m2.smoothing * orig(1) + (1 - m2.smoothing) * smoothed2(0)) + round2Dec(smoothed2.toArray.last) should be (9.33) + } + + test("removing time dependent effects") { + val smoothed = new DenseVector(Array(1.0, 1.2, 1.56, 2.05, 2.64, 3.31, 4.05, 4.84, 5.67, 6.54)) + + val m1 = new EWMAModel(0.2) + val orig1 = new DenseVector(Array.fill(10)(0.0)) + m1.removeTimeDependentEffects(smoothed, orig1) + + round2Dec(orig1(0)) should be (1.0) + orig1.toArray.last.toInt should be(10) + } + + test("fitting EWMA model") { + // We reproduce the example in ch 7.1 from + // https://www.otexts.org/fpp/7/1 + val oil = Array(446.7, 454.5, 455.7, 423.6, 456.3, 440.6, 425.3, 485.1, 506.0, 526.8, + 514.3, 494.2) + val model = EWMA.fitModel(new DenseVector(oil)) + val truncatedSmoothing = (model.smoothing * 100.0).toInt + truncatedSmoothing should be (89) // approximately 0.89 + } + + private def round2Dec(x: Double): Double = { + (x * 100).round / 100.00 + } +} From dcc429c23d0c3acb679c9cadac1820e2b70e3e92 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Mon, 15 Jun 2015 15:26:04 -0700 Subject: [PATCH 05/42] cleaned up additional style issues --- .../sparkts/TimeSeriesStatisticalTests.scala | 43 +++++++++---------- .../sparkts/UnivariateTimeSeries.scala | 19 ++++---- .../TimeSeriesStatisticalTestsSuite.scala | 26 +++++------ 3 files changed, 40 insertions(+), 48 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index cff41d63..c22f7938 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -248,19 +248,18 @@ object TimeSeriesStatisticalTests { } /** - * Breusch-Godfrey test for serial correlation in a model - * The statistic asymptotically follows an X^2 distribution with maxLag degrees of freedom, - * and provides a test for the null hypothesis of lack of serial correlation up to degree maxLag - * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Godfrey_test: - * Given estimated residuals u_hat_t from an OLS model of the form - * y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t - * We calculate an auxiliary regression of the form: - * u_hat_t = a0 + a1 * x1_t + a2 * x2_t + ... + p1 * u_hat_t-1 + p2 * u_hat_t-2 ... - * Our test statistic is then (# of obs - maxLag) * (R^2 of the auxiliary regression) - * @return The Breusch-Godfrey statistic and p value - */ + * Breusch-Godfrey test for serial correlation in a model + * The statistic asymptotically follows an X^2 distribution with maxLag degrees of freedom, + * and provides a test for the null hypothesis of lack of serial correlation up to degree maxLag + * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Godfrey_test: + * Given estimated residuals u_hat_t from an OLS model of the form + * y_t = a0 + a1 * x1_t + a2 * x2_t + ... + u_t + * We calculate an auxiliary regression of the form: + * u_hat_t = a0 + a1 * x1_t + a2 * x2_t + ... + p1 * u_hat_t-1 + p2 * u_hat_t-2 ... + * Our test statistic is then (# of obs - maxLag) * (R^2 of the auxiliary regression) + * @return The Breusch-Godfrey statistic and p value + */ def bgtest(residuals: Vector[Double], factors: Matrix[Double], maxLag: Int): (Double, Double) = { - // original regression model val origResiduals = residuals.toArray val origFactors = Util.matToRowArrs(factors) // X (wiki) // auxiliary regression model @@ -275,16 +274,16 @@ object TimeSeriesStatisticalTests { } /** - * Breusch-Pagan test for heteroskedasticity in a model - * The statistic follows a X^2 distribution with (# of regressors - 1) degrees of freedom - * and provides a test for a null hypothesis of homoscedasticity - * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Pagan_test - * Given a vector of estimated residuals (u) from an OLS model, we create an auxiliary regression - * that models the squared residuals (u^2) as a function of the original regressors (X) - * u^2 = beta * X - * We construct our test statistic as (# of observations) * R^2 of our auxiliary regression - * @return The Breusch-Pagan statistic and p value - */ + * Breusch-Pagan test for heteroskedasticity in a model + * The statistic follows a X^2 distribution with (# of regressors - 1) degrees of freedom + * and provides a test for a null hypothesis of homoscedasticity + * From http://en.wikipedia.org/wiki/Breusch%E2%80%93Pagan_test + * Given a vector of estimated residuals (u) from an OLS model, we create an auxiliary regression + * that models the squared residuals (u^2) as a function of the original regressors (X) + * u^2 = beta * X + * We construct our test statistic as (# of observations) * R^2 of our auxiliary regression + * @return The Breusch-Pagan statistic and p value + */ def bptest(residuals: Vector[Double], factors: Matrix[Double]): (Double, Double) = { val residualsSquared = residuals.toArray.map(x => x * x) // u^2 val origFactors = Util.matToRowArrs(factors) // X diff --git a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala index fb5fb93a..9466f668 100644 --- a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala +++ b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala @@ -168,9 +168,9 @@ object UnivariateTimeSeries { } /** - * fills in NaN with the previously available not NaN, scanning from left to right. - * 1 NaN NaN 2 Nan -> 1 1 1 2 2 - */ + * fills in NaN with the previously available not NaN, scanning from left to right. + * 1 NaN NaN 2 Nan -> 1 1 1 2 2 + */ def fillPrevious(values: Vector[Double]): DenseVector[Double] = { val result = new DenseVector(values.toArray) var filler = Double.NaN // initial value, maintains invariant @@ -188,9 +188,9 @@ object UnivariateTimeSeries { } /** - * fills in NaN with the next available not NaN, scanning from right to left. - * 1 NaN NaN 2 Nan -> 1 2 2 2 NaN - */ + * fills in NaN with the next available not NaN, scanning from right to left. + * 1 NaN NaN 2 Nan -> 1 2 2 2 NaN + */ def fillNext(values: Vector[Double]): DenseVector[Double] = { val result = new DenseVector(values.toArray) var filler = Double.NaN // initial value, maintains invariant @@ -203,14 +203,13 @@ object UnivariateTimeSeries { result } - def fillWithDefault(values: Array[Double], filler: Double): Array[Double] = { fillWithDefault(new DenseVector(values), filler).data } /** - * fills in NaN with a default value - */ + * fills in NaN with a default value + */ def fillWithDefault(values: Vector[Double], filler: Double): DenseVector[Double] = { val result = new DenseVector(values.toArray) var i = 0 @@ -247,4 +246,4 @@ object UnivariateTimeSeries { } def ar(values: Vector[Double], maxLag: Int): ARModel = Autoregression.fitModel(values, maxLag) -} +} \ No newline at end of file diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala index 4363c231..698e4f43 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala @@ -16,25 +16,21 @@ package com.cloudera.sparkts -import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression - -import scala.Double.NaN - import breeze.linalg._ import com.cloudera.sparkts.TimeSeriesStatisticalTests._ +import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression import org.apache.commons.math3.random.MersenneTwister import org.scalatest.{FunSuite, ShouldMatchers} class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { test("breusch-godfrey") { - // Replicating the example provided by R package lmtest for bgtest - val rand = new MersenneTwister(5L) + // Replicating the example provided by R package lmtest for bgtest + val rand = new MersenneTwister(5L) val n = 100 val coef = 0.5 // coefficient for lagged series - val x = Array.fill(n / 2)(Array(1.0, -1.0)).flatten // stationary series val y1 = x.map(_ + 1 + rand.nextGaussian()) @@ -64,17 +60,17 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { val rand = new MersenneTwister(5L) val n = 100 val x = Array.fill(n / 2)(Array(-1.0, 1.0)).flatten - - // homoscedastic residuals with variance 1 throughout + + // homoskedastic residuals with variance 1 throughout val err1 = Array.fill(n)(rand.nextGaussian) // heteroskedastic residuals with alternating variance of 1 and 4 - val varFactor = 4 + val varFactor = 2 val err2 = err1.zipWithIndex.map { case (x, i) => if(i % 2 == 0) x * varFactor else x } - + // generate dependent variables val y1 = x.zip(err1).map { case (xi, ei) => xi + ei + 1 } val y2 = x.zip(err2).map { case (xi, ei) => xi + ei + 1 } - + // create models and calculate residuals val OLS1 = new OLSMultipleLinearRegression() OLS1.newSampleData(y1, x.map(Array(_))) @@ -83,13 +79,11 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { val OLS2 = new OLSMultipleLinearRegression() OLS2.newSampleData(y2, x.map(Array(_))) val resids2 = OLS2.estimateResiduals() - + val pthreshold = 0.05 // there should be no evidence of heteroskedasticity bptest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x))._2 should be > pthreshold // there should be evidence of heteroskedasticity bptest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x))._2 should be < pthreshold - - } - + } } From 5cfc79bafb70fc9c1757fbe3247d9c972efbc608 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 15 Jun 2015 17:55:28 -0700 Subject: [PATCH 06/42] Add Hadoop dependency --- pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pom.xml b/pom.xml index 97824250..f7b3bfc2 100644 --- a/pom.xml +++ b/pom.xml @@ -160,6 +160,12 @@ + + org.apache.hadoop + hadoop-yarn-client + 2.6.0 + provided + org.scala-lang scala-library From 1aff3f4cd45687fe5b5be1c16ef0be0ff6ec9c34 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 15 Jun 2015 17:55:48 -0700 Subject: [PATCH 07/42] Add seed generation in historical var example --- .../HistoricalValueAtRiskExample.scala | 18 ++-- .../cloudera/sparkts/YarnAllocatorSuite.scala | 86 +++++++++++++++++++ 2 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 src/test/scala/com/cloudera/sparkts/YarnAllocatorSuite.scala diff --git a/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala b/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala index 63f42d56..f9030d03 100644 --- a/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala +++ b/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala @@ -25,7 +25,7 @@ import com.cloudera.sparkts.TimeSeriesRDD._ import com.github.nscala_time.time.Imports._ -import org.apache.commons.math3.random.RandomGenerator +import org.apache.commons.math3.random.{MersenneTwister, RandomGenerator} import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression import org.apache.spark.{SparkConf, SparkContext} @@ -33,10 +33,11 @@ import org.apache.spark.{SparkConf, SparkContext} object HistoricalValueAtRiskExample { def main(args: Array[String]): Unit = { // Read parameters - val numTrials = if (args.length > 0) args(0).toInt else 10000 - val parallelism = if (args.length > 1) args(1).toInt else 10 - val factorsDir = if (args.length > 2) args(2) else "factors/" - val instrumentsDir = if (args.length > 3) args(3) else "instruments/" + val factorsDir = if (args.length > 0) args(0) else "factors/" + val instrumentsDir = if (args.length > 1) args(1) else "instruments/" + val numTrials = if (args.length > 2) args(2).toInt else 10000 + val parallelism = if (args.length > 3) args(3).toInt else 10 + val horizon = if (args.length > 4) args(4).toInt else 1 // Initialize Spark val conf = new SparkConf().setMaster("local").setAppName("Historical VaR") @@ -72,7 +73,12 @@ object HistoricalValueAtRiskExample { } // Generate an RDD of simulations -// val rand = new MersenneTwister() + val seeds = sc.parallelize(0 until numTrials, parallelism) + seeds.map { seed => + val rand = new MersenneTwister(seed) + val factorPaths = simulatedFactorReturns(horizon, rand, iidFactorReturns, garchModels) + } + // val factorsDist = new FilteredHistoricalFactorDistribution(rand, iidFactorReturns.toArray, // garchModels.asInstanceOf[Array[TimeSeriesFilter]]) // val returns = simulationReturns(0L, factorsDist, numTrials, parallelism, sc, diff --git a/src/test/scala/com/cloudera/sparkts/YarnAllocatorSuite.scala b/src/test/scala/com/cloudera/sparkts/YarnAllocatorSuite.scala new file mode 100644 index 00000000..fbb8859a --- /dev/null +++ b/src/test/scala/com/cloudera/sparkts/YarnAllocatorSuite.scala @@ -0,0 +1,86 @@ +package com.cloudera.sparkts + +import org.scalatest.FunSuite +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.util.Random +import org.apache.hadoop.yarn.api.records.{Priority, Resource} + +class YarnAllocatorSuite extends FunSuite { + test("benchmark") { + val map = new HashMap[String, Int] + val numNodes = 1000 + val numTasks = 10000 + val rand = new Random(100) + var i = 0 + while (i < numTasks) { + val taskLocations = (0 until 3).map(x => rand.nextInt(numNodes).toString) + taskLocations.foreach { taskLoc => + if (map.contains(taskLoc)) { + map(taskLoc) += 1 + } else { + map(taskLoc) = 1 + } + } + i += 1 + } + println("done generating tasks") + val amClient = AMRMClient.createAMRMClient[ContainerRequest]() + var existing = new ArrayBuffer[ContainerRequest](0) + println("done creating amrmclient") + existing = regenerateContainerRequests(amClient, existing, map) + val warmupRounds = 1000 + val warmupStart = System.currentTimeMillis + i = 0 + while (i < warmupRounds) { + existing = regenerateContainerRequests(amClient, existing, map) + i += 1 + } + val warmupTime = System.currentTimeMillis - warmupStart + println(s"warmup time: $warmupTime") + val actualRounds = 1000 + val actualStart = System.currentTimeMillis + i = 0 + while (i < actualRounds) { + existing = regenerateContainerRequests(amClient, existing, map) + i += 1 + } + val actualTime = System.currentTimeMillis - actualStart + println(s"actual time: $actualTime") + } + + val _resource = Resource.newInstance(1024, 1) + val _priority = Priority.newInstance(1) + + def regenerateContainerRequests( + amClient: AMRMClient[ContainerRequest], + existingRequests: Seq[ContainerRequest], + prefs: HashMap[String, Int]): ArrayBuffer[ContainerRequest] = { + var i = 0 + while (i < existingRequests.length) { + amClient.removeContainerRequest(existingRequests(i)) + i += 1 + } + val prefsClone = new HashMap[String, Int] + prefsClone ++= prefs + val newRequests = new ArrayBuffer[ContainerRequest] + while (!prefsClone.isEmpty) { + val locs = prefsClone.keySet.toArray + var j = 0 + while (j < locs.length) { + val num = prefsClone(locs(j)) + if (num == 1) { + prefsClone.remove(locs(j)) + } else { + prefsClone(locs(j)) = num - 1 + } + j += 1 + } + val request = new ContainerRequest(_resource, null, locs, _priority) + newRequests += request + amClient.addContainerRequest(request) + } + newRequests + } +} From e1487d0b20b2ed19fa72980ead0568400565996c Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 15 Jun 2015 17:57:22 -0700 Subject: [PATCH 08/42] Convert tab to spaces --- .../com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala index aea1a84f..698e4f43 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala @@ -85,5 +85,5 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { bptest(new DenseVector(resids1), new DenseMatrix(x.length, 1, x))._2 should be > pthreshold // there should be evidence of heteroskedasticity bptest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x))._2 should be < pthreshold - } + } } From db6a71678d516e6c9fa6a509e1ee43ee15da2b7e Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Wed, 17 Jun 2015 14:56:20 -0700 Subject: [PATCH 09/42] changed SSE calculation to use while loop rather than map + sum --- src/main/scala/com/cloudera/sparkts/EWMA.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/EWMA.scala b/src/main/scala/com/cloudera/sparkts/EWMA.scala index 09e7dfad..ff47f59d 100644 --- a/src/main/scala/com/cloudera/sparkts/EWMA.scala +++ b/src/main/scala/com/cloudera/sparkts/EWMA.scala @@ -61,10 +61,17 @@ class EWMAModel(val smoothing: Double) extends TimeSeriesModel { val n = ts.length val smoothed = new DenseVector(Array.fill(n)(0.0)) addTimeDependentEffects(ts, smoothed) - val tsArr = ts.toArray - val smoothedArr = smoothed.toArray - val sqrErrors = smoothedArr.zip(tsArr.tail).map { case (yhat, y) => (yhat - y) * (yhat - y) } - sum(sqrErrors) + + var i = 0 + var error = 0.0 + var sqrErrors = 0.0 + while (i < n - 1) { + error = ts(i + 1) - smoothed(i) + sqrErrors += error * error + i += 1 + } + + sqrErrors } override def removeTimeDependentEffects(ts: Vector[Double], dest: Vector[Double] = null) From fd45327240ff2742cefaafae7e9725c875c6d0f4 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Wed, 17 Jun 2015 16:19:48 -0700 Subject: [PATCH 10/42] changed EWMA fitting to use gradient descent --- .../scala/com/cloudera/sparkts/EWMA.scala | 67 +++++++++++++++---- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/EWMA.scala b/src/main/scala/com/cloudera/sparkts/EWMA.scala index ff47f59d..66843a44 100644 --- a/src/main/scala/com/cloudera/sparkts/EWMA.scala +++ b/src/main/scala/com/cloudera/sparkts/EWMA.scala @@ -17,10 +17,14 @@ package com.cloudera.sparkts import breeze.linalg._ -import org.apache.commons.math3.analysis.UnivariateFunction -import org.apache.commons.math3.optim.MaxEval +import org.apache.commons.math3.analysis.{MultivariateFunction, MultivariateVectorFunction} +import org.apache.commons.math3.optim.{MaxIter, InitialGuess, SimpleValueChecker, MaxEval} import org.apache.commons.math3.optim.nonlinear.scalar.GoalType -import org.apache.commons.math3.optim.univariate.{BrentOptimizer, UnivariateObjectiveFunction, SearchInterval} + + +import org.apache.commons.math3.optim.nonlinear.scalar.{ObjectiveFunction, +ObjectiveFunctionGradient} +import org.apache.commons.math3.optim.nonlinear.scalar.gradient.NonLinearConjugateGradientOptimizer object EWMA { /** @@ -33,18 +37,29 @@ object EWMA { * @return EWMAmodel */ def fitModel(ts: Vector[Double]): EWMAModel = { - val objectiveFunction = new UnivariateObjectiveFunction(new UnivariateFunction() { - def value(param: Double): Double = { - new EWMAModel(param).sse(ts) + val optimizer = new NonLinearConjugateGradientOptimizer( + NonLinearConjugateGradientOptimizer.Formula.FLETCHER_REEVES, + new SimpleValueChecker(1e-6, 1e-6)) + val gradient = new ObjectiveFunctionGradient(new MultivariateVectorFunction() { + def value(params: Array[Double]): Array[Double] = { + val g = new EWMAModel(params(0)).gradient(ts) + Array(g) } }) - + val objectiveFunction = new ObjectiveFunction(new MultivariateFunction() { + def value(params: Array[Double]): Double = { + new EWMAModel(params(0)).sse(ts) + } + }) + // optimization parameters + val initGuess = new InitialGuess(Array(.94)) + val maxIter = new MaxIter(10000) val maxEval = new MaxEval(10000) - val paramBounds = new SearchInterval(0.0, 1.0) - val optimizer = new BrentOptimizer(1e-6, 1e-6) - val optimal = optimizer.optimize(maxEval, objectiveFunction, GoalType.MINIMIZE, paramBounds) - val param = optimal.getPoint - new EWMAModel(param) + val goal = GoalType.MINIMIZE + // optimization step + val optimal = optimizer.optimize(objectiveFunction, goal, gradient, initGuess, maxIter, maxEval) + val params = optimal.getPoint + new EWMAModel(params(0)) } } @@ -74,6 +89,34 @@ class EWMAModel(val smoothing: Double) extends TimeSeriesModel { sqrErrors } + /** + * Calculates the gradient of the SSE cost function for our EWMA model + * @param ts + * @return gradient + */ + private[sparkts] def gradient(ts: Vector[Double]): Double = { + val n = ts.length + val smoothed = new DenseVector(Array.fill(n)(0.0)) + addTimeDependentEffects(ts, smoothed) + + var error = 0.0 + var prevSmoothed = ts(0) + var prevDSda = 0.0 // derivative of the EWMA function at time t - 1: (d S(t - 1)/ d smoothing) + var dSda = 0.0 // derivative of the EWMA function at time t: (d S(t) / d smoothing) + var dJda = 0.0 // derivative of our SSE cost function + var i = 0 + + while (i < n - 1) { + error = ts(i + 1) - smoothed(i) + dSda = ts(i) - prevSmoothed + (1 - smoothing) * prevDSda + dJda += error * dSda + prevDSda = dSda + prevSmoothed = smoothed(i) + i += 1 + } + 2 * dJda + } + override def removeTimeDependentEffects(ts: Vector[Double], dest: Vector[Double] = null) : Vector[Double] = { dest(0) = ts(0) // by definition in our model S_0 = X_0 From 7adc5d0b2a60d5f00528e8650261b2d851dcaa5e Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Wed, 17 Jun 2015 16:28:34 -0700 Subject: [PATCH 11/42] style changes --- src/main/scala/com/cloudera/sparkts/EWMA.scala | 15 +++++++++------ .../scala/com/cloudera/sparkts/EWMASuite.scala | 1 - 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/EWMA.scala b/src/main/scala/com/cloudera/sparkts/EWMA.scala index 66843a44..8aaf6dcb 100644 --- a/src/main/scala/com/cloudera/sparkts/EWMA.scala +++ b/src/main/scala/com/cloudera/sparkts/EWMA.scala @@ -18,14 +18,17 @@ package com.cloudera.sparkts import breeze.linalg._ import org.apache.commons.math3.analysis.{MultivariateFunction, MultivariateVectorFunction} -import org.apache.commons.math3.optim.{MaxIter, InitialGuess, SimpleValueChecker, MaxEval} -import org.apache.commons.math3.optim.nonlinear.scalar.GoalType - - -import org.apache.commons.math3.optim.nonlinear.scalar.{ObjectiveFunction, +import org.apache.commons.math3.optim.{InitialGuess, MaxEval, MaxIter, SimpleValueChecker} +import org.apache.commons.math3.optim.nonlinear.scalar.{GoalType, ObjectiveFunction, ObjectiveFunctionGradient} import org.apache.commons.math3.optim.nonlinear.scalar.gradient.NonLinearConjugateGradientOptimizer +/** + * Fits an Exponentially Weight Moving Average model (EWMA) (aka. Simple Exponential Smoothing) to + * a time series. The model is defined as S_t = (1 - a) * X_t + a * S_{t - 1}, where a is the + * smoothing parameter, X is the original series, and S is the smoothed series. For more + * information, please see https://en.wikipedia.org/wiki/Exponential_smoothing. + */ object EWMA { /** * Fits an EWMA model to a time series. Uses the first point in the time series as a starting @@ -34,7 +37,7 @@ object EWMA { * a is the smoothing parameter, X is the original series, and S is the smoothed series * https://en.wikipedia.org/wiki/Exponential_smoothing * @param ts the time series to which we want to fit a EWMA model - * @return EWMAmodel + * @return EWMA model */ def fitModel(ts: Vector[Double]): EWMAModel = { val optimizer = new NonLinearConjugateGradientOptimizer( diff --git a/src/test/scala/com/cloudera/sparkts/EWMASuite.scala b/src/test/scala/com/cloudera/sparkts/EWMASuite.scala index f0a39379..2e1106b2 100644 --- a/src/test/scala/com/cloudera/sparkts/EWMASuite.scala +++ b/src/test/scala/com/cloudera/sparkts/EWMASuite.scala @@ -19,7 +19,6 @@ import breeze.linalg._ import org.scalatest.{FunSuite, ShouldMatchers} - class EWMASuite extends FunSuite with ShouldMatchers { test("adding time dependent effects") { val orig = new DenseVector((1 to 10).toArray.map(_.toDouble)) From 88bd843f81226bdfac0fe468a85ecf947e664ae6 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Wed, 17 Jun 2015 16:43:14 -0700 Subject: [PATCH 12/42] added note stating ubounded optimization for ewma --- src/main/scala/com/cloudera/sparkts/EWMA.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/EWMA.scala b/src/main/scala/com/cloudera/sparkts/EWMA.scala index 8aaf6dcb..1cd64217 100644 --- a/src/main/scala/com/cloudera/sparkts/EWMA.scala +++ b/src/main/scala/com/cloudera/sparkts/EWMA.scala @@ -20,7 +20,7 @@ import breeze.linalg._ import org.apache.commons.math3.analysis.{MultivariateFunction, MultivariateVectorFunction} import org.apache.commons.math3.optim.{InitialGuess, MaxEval, MaxIter, SimpleValueChecker} import org.apache.commons.math3.optim.nonlinear.scalar.{GoalType, ObjectiveFunction, -ObjectiveFunctionGradient} + ObjectiveFunctionGradient} import org.apache.commons.math3.optim.nonlinear.scalar.gradient.NonLinearConjugateGradientOptimizer /** @@ -35,8 +35,11 @@ object EWMA { * value. Uses sum squared error as an objective function to optimize to find smoothing parameter * The model for EWMA is recursively defined as S_t = (1 - a) * X_t + a * S_{t-1}, where * a is the smoothing parameter, X is the original series, and S is the smoothed series + * Note that the optimization is performed as unbounded optimization, although in its formal + * definition the smoothing parameter is <= 1, which corresponds to an inequality bounded + * optimization. Given this, the resulting smoothing parameter should always be sanity checked * https://en.wikipedia.org/wiki/Exponential_smoothing - * @param ts the time series to which we want to fit a EWMA model + * @param ts the time series to which we want to fit an EWMA model * @return EWMA model */ def fitModel(ts: Vector[Double]): EWMAModel = { From e942aae37d87ff20b8a4aad0d2b301635759e027 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 18 Jun 2015 18:02:46 -0700 Subject: [PATCH 13/42] Document more functionality in README --- README.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1f4f3ba0..8ea0397e 100644 --- a/README.md +++ b/README.md @@ -52,16 +52,24 @@ TimeSeriesRDDs then support efficient series-wise operations like slicing, imput val residuals = filled.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series)) -Statistical Functionality +Functionality -------------------------- -### Time Series +### Time Series Manipulation +* Aligning +* Slicing by date-time +* Missing value imputation +### Time Series Math and Stats + +* Exponentially weighted moving average * Autoregressive models * GARCH models * Missing data imputation * Augmented Dickey-Fuller test * Durbin-Watson test +* Breusch-Godfrey test +* Breusch-Pagan test ### General Prob / Stats From d151a6fea73765e33407cf28b2afa8b5cc910197 Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Mon, 22 Jun 2015 20:12:56 -0400 Subject: [PATCH 14/42] Copyright bump --- pom.xml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index f7b3bfc2..db916778 100644 --- a/pom.xml +++ b/pom.xml @@ -1,6 +1,6 @@ - + 4.0.0 com.cloudera.datascience sparktimeseries From 3bd0c28bb03ee7513e0135aae8422c86f7186d7b Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Mon, 22 Jun 2015 22:21:29 -0400 Subject: [PATCH 15/42] Quick find & replace: toSamples -> toInstants --- .../com/cloudera/finance/examples/SimpleTickDataExample.scala | 2 +- src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala | 2 +- src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/cloudera/finance/examples/SimpleTickDataExample.scala b/src/main/scala/com/cloudera/finance/examples/SimpleTickDataExample.scala index 740c5241..90d39ed6 100644 --- a/src/main/scala/com/cloudera/finance/examples/SimpleTickDataExample.scala +++ b/src/main/scala/com/cloudera/finance/examples/SimpleTickDataExample.scala @@ -71,6 +71,6 @@ object SimpleTickDataExample { val iidRdd = slicedRdd.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series)) // Regress a stock against all the others - val samples = iidRdd.toSamples() + val samples = iidRdd.toInstants() } } diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala index c7b46dd9..38f8200f 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala @@ -158,7 +158,7 @@ class TimeSeriesRDD(val index: DateTimeIndex, parent: RDD[(String, Vector[Double * In the returned RDD, the ordering of values within each record corresponds to the ordering of * the time series records in the original RDD. The records are ordered by time. */ - def toSamples(nPartitions: Int = -1): RDD[(DateTime, Vector[Double])] = { + def toInstants(nPartitions: Int = -1): RDD[(DateTime, Vector[Double])] = { val maxChunkSize = 20 val dividedOnMapSide = mapPartitionsWithIndex { case (partitionId, iter) => diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala index f008edcb..7b78d922 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesRDDSuite.scala @@ -60,7 +60,7 @@ class TimeSeriesRDDSuite extends FunSuite with LocalSparkContext with ShouldMatc rdd.filterEndingAfter(start).count() should be (3) } - test("toSamples") { + test("toInstants") { val conf = new SparkConf().setMaster("local").setAppName(getClass.getName) TimeSeriesKryoRegistrator.registerKryoClasses(conf) sc = new SparkContext(conf) @@ -71,7 +71,7 @@ class TimeSeriesRDDSuite extends FunSuite with LocalSparkContext with ShouldMatc val index = uniform(start, 4, 1.days) val rdd = sc.parallelize(labels.zip(seriesVecs.map(_.asInstanceOf[Vector[Double]])), 3) val tsRdd = new TimeSeriesRDD(index, rdd) - val samples = tsRdd.toSamples().collect() + val samples = tsRdd.toInstants().collect() samples should be (Array( (start, new DenseVector((0.0 until 20.0 by 4.0).toArray)), (start + 1.days, new DenseVector((1.0 until 20.0 by 4.0).toArray)), From 5a4aa29140b93d218592c4aa4a95f44da002167a Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Wed, 24 Jun 2015 19:59:49 -0700 Subject: [PATCH 16/42] added up/down sampling and cubic spline interpolation --- .../sparkts/UnivariateTimeSeries.scala | 83 ++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala index 9466f668..551d7378 100644 --- a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala +++ b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala @@ -18,6 +18,8 @@ package com.cloudera.sparkts import breeze.linalg._ import breeze.stats._ +import org.apache.commons.math3.analysis.interpolation.SplineInterpolator + object UnivariateTimeSeries { def autocorr(ts: Array[Double], numLags: Int): Array[Double] = { autocorr(new DenseVector(ts), numLags).toDenseVector.data @@ -125,6 +127,7 @@ object UnivariateTimeSeries { case "nearest" => fillNearest(ts) case "next" => fillNext(ts) case "previous" => fillPrevious(ts) + case "spline" => fillSpline(ts) case _ => throw new UnsupportedOperationException() } } @@ -245,5 +248,83 @@ object UnivariateTimeSeries { result } + def fillSpline(values: Array[Double]): Array[Double] = { + fillSpline(new DenseVector(values)).data + } + + /** + * Fill in NA values using a natural cubic spline. + * @param values Vector to interpolate + * @return Interpolated vector + */ + def fillSpline(values: Vector[Double]): DenseVector[Double]= { + val result = new DenseVector(values.toArray) + val interp = new SplineInterpolator() + val knotsAndValues = values.toArray.zipWithIndex.filter(!_._1.isNaN) + // Note that the type of unzip is missed up in scala 10.4 as per + // https://issues.scala-lang.org/browse/SI-8081 + // given that this project is using scala 10.4, we cannot use unzip, so unpack manually + val knotsX = knotsAndValues.map(_._2.toDouble) + val knotsY = knotsAndValues.map(_._1) + val filler = interp.interpolate(knotsX, knotsY) + + // values that we can interpolate between, others need to be filled w/ other function + var i = knotsX(0).toInt + val end = knotsX.last.toInt + + while (i < end) { + result(i) = filler.value(i.toDouble) + i += 1 + } + result + } + def ar(values: Vector[Double], maxLag: Int): ARModel = Autoregression.fitModel(values, maxLag) -} \ No newline at end of file + + /** + * Down sample by taking every nth element starting from offset phase + * @param values + * @param n + * @param phase + * @return + */ + def downSample(values: Vector[Double], n: Int, phase: Int = 0) = { + val origLen = values.length + val newLen = Math.ceil((values.length - phase) / n.toDouble).toInt + val sampledValues = new DenseVector(Array.fill(newLen)(0.0)) + var i = phase + var j = 0 + + while (j < newLen) { + sampledValues(j) = values(i) + i += n + j += 1 + } + sampledValues + } + + /** + * Up sample by inserting n - 1 elements into the original values vector, starting at index phase + * @param values the original data vector + * @param n the number of insertions between elements + * @param phase the offset to begin + * @param useZero + * @return + */ + def upSample(values: Vector[Double], n: Int, phase: Int = 0, useZero: Boolean = false) = { + val filler = if (useZero) 0 else Double.NaN + val origLen = values.length + val newLen = origLen * n + val sampledValues = new DenseVector(Array.fill(newLen)(filler)) + var i = phase + var j = 0 + + while (j < origLen) { + sampledValues(i) = values(j) + i += n + j += 1 + } + sampledValues + } +} + From 4b98b632e8b7606ec221b2802d5f05e9a7ef0df1 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Wed, 24 Jun 2015 20:27:13 -0700 Subject: [PATCH 17/42] added up/down sampling tests --- .../sparkts/UnivariateTimeSeriesSuite.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala b/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala index e486d9f4..46423b2b 100644 --- a/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala @@ -46,4 +46,55 @@ class UnivariateTimeSeriesSuite extends FunSuite with ShouldMatchers { arAutocorr(1) should be > 0.0 arAutocorr(2) should be > 0.0 } + + test("upsampling") { + // replicating upsampling examples + // from http://www.mathworks.com/help/signal/ref/upsample.html?searchHighlight=upsample + val y = new DenseVector(Array(1.0, 2.0, 3.0, 4.0)) + val yUp1 = upSample(y, 3, useZero = true).toArray + yUp1 should be (Array(1.0, 0.0, 0.0, 2.0, 0.0, 0.0, 3.0, 0.0, 0.0, 4.0, 0.0, 0.0)) + + val yUp2 = upSample(y, 3, useZero = true, phase = 2).toArray + yUp2 should be (Array(0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 0.0, 3.0, 0.0, 0.0, 4.0)) + } + + test("downsampling") { + // replicating downsampling examples + // from http://www.mathworks.com/help/signal/ref/downsample.html?searchHighlight=downsample + val y = new DenseVector((1 to 10).toArray.map(_.toDouble)) + val yDown1 = downSample(y, 3).toArray + yDown1 should be (Array(1.0, 4.0, 7.0, 10.0)) + + val yDown2 = downSample(y, 3, phase = 2).toArray + yDown2 should be (Array(3.0, 6.0, 9.0)) + + } + + test("signal reconstruction with spline") { + // If we have a frequent signal, downsample it (at a rate that doesn't cause aliasing) + // and we upsample, and apply a filter (interpolation), then the result should be fairly + // close to the original signal. In our case, we drop NAs that are not filled by interpolation + // (i.e no extrapolation) + + val y = (1 to 1000).toArray.map(_.toDouble / 100.0).map(Math.sin(_)) + val vy = new DenseVector(y) + val lessFreq = downSample(vy, 100) + val moreFreq = upSample(lessFreq, 100) + + // work on copies + val splineY = fillSpline(new DenseVector(moreFreq.toArray)).toArray + val lineY = fillLinear(new DenseVector(moreFreq.toArray)).toArray + + val MSE = (est: Array[Double], obs: Array[Double]) => { + val errs = est.zip(obs).filter(!_._1.isNaN).map { case (yhat, y) => (yhat - y) * (yhat - y) } + errs.sum / errs.length + } + + val sE = MSE(splineY, y) + val lE = MSE(lineY, y) + + // a cubic spline should be better than linear interpolation + sE should be < lE + } } + From 48d925139a9c0355a7eb1dba9f4944a5f5585cf7 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 26 Jun 2015 00:33:14 -0700 Subject: [PATCH 18/42] Harmonize MLlib dependency version --- pom.xml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index db916778..ec9366fd 100644 --- a/pom.xml +++ b/pom.xml @@ -206,8 +206,7 @@ org.apache.spark spark-mllib_${scala.minor.version} - - 1.2.0 + ${spark.version} provided From dd80326e7ed7423add76c4ceeb637f38559a3d98 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Fri, 26 Jun 2015 17:11:05 -0700 Subject: [PATCH 19/42] started on acf plot --- .../scala/com/cloudera/sparkts/EasyPlot.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala index bbafca3d..6083d6fd 100644 --- a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala +++ b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala @@ -18,6 +18,9 @@ package com.cloudera.sparkts import breeze.linalg._ import breeze.plot._ +import org.apache.commons.math3.distribution.NormalDistribution + + object EasyPlot { def ezplot(vec: Vector[Double], style: Char): Unit = { val f = Figure() @@ -45,4 +48,35 @@ object EasyPlot { } def ezplot(vecs: Seq[Vector[Double]]): Unit = ezplot(vecs, '-') + + def acfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { + val autoCorrs = UnivariateTimeSeries.autocorr(data, maxLag) + val nCorrs = autoCorrs.length + val nSample = data.length + val stdNormDist = new NormalDistribution(0, 1) + + val pVal = (1 - conf) / 2.0 + val confVal = stdNormDist.inverseCumulativeProbability(1 - pVal) / Math.sqrt(nSample) + val f = Figure() + val p = f.subplot(0) + + // Basic information + p.title = "Auto correlation function" + p.xlabel = "Lag" + p.ylabel = "Correlation" + + // plot correlations as vertical lines + val verticalLines = autoCorrs.zipWithIndex.map { case (corr, ix) => + (Array(ix.toDouble, ix.toDouble), Array(0, corr)) + } + + verticalLines.foreach { case (xs, ys) => p += plot(xs, ys) } + + // plot confidence intervals + Array(confVal, -1 * confVal).foreach { conf => + val xs = (1 to nCorrs).toArray.map(_.toDouble) + val ys = Array.fill(nCorrs)(conf) + p += plot(xs, ys, '-', colorcode = "red") + } + } } From 1c595b0fe6a26877d4727b1eb358cb459720e25f Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 26 Jun 2015 20:52:17 -0700 Subject: [PATCH 20/42] Mention R's equivalents in README --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 8ea0397e..7eae5ed0 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,12 @@ A Scala / Java library for interacting with time series data on Apache Spark. Scaladoc is available at http://cloudera.github.io/spark-timeseries. The aim here is to provide -* A set of abstractions for transforming and summarizing large time series data sets, similar to +* A set of abstractions for manipulating large time series data sets, similar to what's provided for smaller data sets in - [Pandas](http://pandas.pydata.org/pandas-docs/dev/timeseries.html) and - [Matlab](http://www.mathworks.com/help/matlab/time-series.html). + [Pandas](http://pandas.pydata.org/pandas-docs/dev/timeseries.html), + [Matlab](http://www.mathworks.com/help/matlab/time-series.html), and R's + [zoo](http://cran.r-project.org/web/packages/zoo/index.html) and + [xts](http://cran.r-project.org/web/packages/xts/index.html) packages. * Models, tests, and functions that enable dealing with time series from a statistical perspective, similar to what's provided in [StatsModels](http://statsmodels.sourceforge.net/devel/tsa.html) and a variety of Matlab and R packages. From 0655b07a0d9e6e79444c5e99130e4227cf48d19d Mon Sep 17 00:00:00 2001 From: Jose Cambronero Date: Sun, 28 Jun 2015 15:07:59 -0700 Subject: [PATCH 21/42] acf plot done --- src/main/scala/com/cloudera/sparkts/EasyPlot.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala index 6083d6fd..dccbe320 100644 --- a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala +++ b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala @@ -65,17 +65,19 @@ object EasyPlot { p.xlabel = "Lag" p.ylabel = "Correlation" + // make decimal ticks visible + p.setYAxisDecimalTickUnits() + // plot correlations as vertical lines val verticalLines = autoCorrs.zipWithIndex.map { case (corr, ix) => - (Array(ix.toDouble, ix.toDouble), Array(0, corr)) + (Array(ix.toDouble + 1, ix.toDouble + 1), Array(0, corr)) } - verticalLines.foreach { case (xs, ys) => p += plot(xs, ys) } - // plot confidence intervals + // plot confidence intervals as horizontal lines Array(confVal, -1 * confVal).foreach { conf => - val xs = (1 to nCorrs).toArray.map(_.toDouble) - val ys = Array.fill(nCorrs)(conf) + val xs = (0 to nCorrs).toArray.map(_.toDouble) + val ys = Array.fill(nCorrs + 1)(conf) p += plot(xs, ys, '-', colorcode = "red") } } From 432b10d8702c17a6d5a86e5c50ffadc0ac5ad555 Mon Sep 17 00:00:00 2001 From: Jose Cambronero Date: Sun, 28 Jun 2015 15:24:52 -0700 Subject: [PATCH 22/42] added pacf, refactored acf --- .../scala/com/cloudera/sparkts/EasyPlot.scala | 53 +++++++++++++------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala index dccbe320..94269d41 100644 --- a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala +++ b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala @@ -33,7 +33,7 @@ object EasyPlot { def ezplot(arr: Array[Double], style: Char): Unit = { val f = Figure() val p = f.subplot(0) - p += plot((0 until arr.length).map(_.toDouble).toArray, arr, style = style) + p += plot(arr.indices.map(_.toDouble).toArray, arr, style = style) } def ezplot(arr: Array[Double]): Unit = ezplot(arr, '-') @@ -50,35 +50,54 @@ object EasyPlot { def ezplot(vecs: Seq[Vector[Double]]): Unit = ezplot(vecs, '-') def acfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { + // calculate correlations and confidence bound val autoCorrs = UnivariateTimeSeries.autocorr(data, maxLag) - val nCorrs = autoCorrs.length - val nSample = data.length - val stdNormDist = new NormalDistribution(0, 1) + val confVal = calcConfVal(conf, data.length) - val pVal = (1 - conf) / 2.0 - val confVal = stdNormDist.inverseCumulativeProbability(1 - pVal) / Math.sqrt(nSample) + // Basic plot information val f = Figure() val p = f.subplot(0) + p.title = "Autocorrelation function" + p.xlabel = "Lag" + p.ylabel = "Autocorrelation" + drawCorrPlot(autoCorrs, confVal, p) + } - // Basic information - p.title = "Auto correlation function" + def pacfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { + // create AR(maxLag) model, retrieve coefficients and calculate confidence bound + val model = Autoregression.fitModel(new DenseVector(data), maxLag) + val pCorrs = model.coefficients // partial autocorrelations are the coefficients in AR(n) model + val confVal = calcConfVal(conf, data.length) + + // Basic plot information + val f = Figure() + val p = f.subplot(0) + p.title = "Partial autocorrelation function" p.xlabel = "Lag" - p.ylabel = "Correlation" + p.ylabel = "Partial Autocorrelation" + drawCorrPlot(pCorrs, confVal, p) + } + private[sparkts] def calcConfVal(conf:Double, n: Int) = { + val stdNormDist = new NormalDistribution(0, 1) + val pVal = (1 - conf) / 2.0 + stdNormDist.inverseCumulativeProbability(1 - pVal) / Math.sqrt(n) + } + + private[sparkts] def drawCorrPlot(corrs: Array[Double], confVal: Double, p: Plot) = { // make decimal ticks visible p.setYAxisDecimalTickUnits() - // plot correlations as vertical lines - val verticalLines = autoCorrs.zipWithIndex.map { case (corr, ix) => + val verticalLines = corrs.zipWithIndex.map { case (corr, ix) => (Array(ix.toDouble + 1, ix.toDouble + 1), Array(0, corr)) - } + } verticalLines.foreach { case (xs, ys) => p += plot(xs, ys) } - // plot confidence intervals as horizontal lines + val n = corrs.length Array(confVal, -1 * confVal).foreach { conf => - val xs = (0 to nCorrs).toArray.map(_.toDouble) - val ys = Array.fill(nCorrs + 1)(conf) - p += plot(xs, ys, '-', colorcode = "red") - } + val xs = (0 to n).toArray.map(_.toDouble) + val ys = Array.fill(n + 1)(conf) + p += plot(xs, ys, '-', colorcode = "red") + } } } From 3c316ee07274e250e45ccb48fc3750f13f6ce356 Mon Sep 17 00:00:00 2001 From: Jose Cambronero Date: Sun, 28 Jun 2015 15:35:52 -0700 Subject: [PATCH 23/42] added scaladocs to acf/pacf --- .DS_Store | Bin 0 -> 6148 bytes Jose_notes_on_possible_contribs.txt | 34 ++++++++++++++++++ src/.DS_Store | Bin 0 -> 6148 bytes src/main/.DS_Store | Bin 0 -> 6148 bytes src/main/scala/.DS_Store | Bin 0 -> 6148 bytes src/main/scala/com/.DS_Store | Bin 0 -> 6148 bytes src/main/scala/com/cloudera/.DS_Store | Bin 0 -> 6148 bytes src/main/scala/com/cloudera/finance/.DS_Store | Bin 0 -> 6148 bytes .../scala/com/cloudera/sparkts/EasyPlot.scala | 12 +++++++ src/test/.DS_Store | Bin 0 -> 6148 bytes src/test/scala/.DS_Store | Bin 0 -> 6148 bytes src/test/scala/com/.DS_Store | Bin 0 -> 6148 bytes src/test/scala/com/cloudera/.DS_Store | Bin 0 -> 6148 bytes 13 files changed, 46 insertions(+) create mode 100644 .DS_Store create mode 100644 Jose_notes_on_possible_contribs.txt create mode 100644 src/.DS_Store create mode 100644 src/main/.DS_Store create mode 100644 src/main/scala/.DS_Store create mode 100644 src/main/scala/com/.DS_Store create mode 100644 src/main/scala/com/cloudera/.DS_Store create mode 100644 src/main/scala/com/cloudera/finance/.DS_Store create mode 100644 src/test/.DS_Store create mode 100644 src/test/scala/.DS_Store create mode 100644 src/test/scala/com/.DS_Store create mode 100644 src/test/scala/com/cloudera/.DS_Store diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..1a9e4c01fdac5a6ce83e086f1cc11070556b4a4e GIT binary patch literal 6148 zcmeHK%Wl&^6ur}g)+Q1S5)!bp!~zx-N?{RLVe$Z#M?rO=3yOjrM~x-d6WI=_R3drF za}9rh;9K~BN_-5Qc@*W*ZX-Z*rI~Z*&b@OzXEZY&B4UkUxK30hA`3-e`7EjvMD&X? zCrjF51DPCSi#Di1&uANH#4unO_}du}dv}$Z^n^CCYbW>bCf%p&v`^ti%)3F%19*yK z;vUMiUs9kEU7-iWDWe{BD1nwk%b;_#Iy8`d8ATLOEPcANC#O`}GUUkVfby9vKW4J5 z&?B5FO={yiJ!(&Tv#mLAk#`?sAE1tBypLwQZOpq(cd12>fv3&+%fy^Zm|;rY>2XPX z^9f$TC>v=0eg=a$%JN$6h%J@MXO>qi+pgG^bE}n?UZ0P=+|P$u!%ts`)>A+42U_3e zJpQBm!Skp$>QpZ@d6N54(o1D<6!zeQQ=V8F#ynXln!^clwzhQC^hJ#+pd5y>L9g3#q zwU;I_PxfFD-pg2EA9U?u`Dd0{F5Dz#$Dzs!*n%7)*u3?r1+(W1&!m6Vs0mrZY4B zLSZsH#&@JUF;}6f4FiUOX$DHFSrzwx`}Ft!bdYHo1`GrL6$7l?@>&f{N#Ct=lM{EX ugYpVRi0~^E$`DlgI+lvKiZ7u^L7yWOpsTS^2n)pg2uK=CWf)i}1HS<_g}MI# literal 0 HcmV?d00001 diff --git a/Jose_notes_on_possible_contribs.txt b/Jose_notes_on_possible_contribs.txt new file mode 100644 index 00000000..a67aabf8 --- /dev/null +++ b/Jose_notes_on_possible_contribs.txt @@ -0,0 +1,34 @@ +Autoregression.scala +1 - factor out code for removeTimeDependentEffects and addTimeDependentEffects + - time comparison + - can we replace while wiht fold? + +Garch.scala +2 - Robust standard error calculations +3 - smart initialization of GARCH models +4 - EGARCH implementation....(ambitious) + +TimeSeries.scala +1 - seems to me that in union we don't need to have c to c, can simplify? check implications +2 - timeseries filter ? +3 - apply with row as date, y as only those labels? +4 - should differences(0) return a vector of 0.0 of the same length (this seems because of breeze.linalg._)? and shouldn't it be index.size, for the slice, not index.size - 1 + + +UnivariateTimeSeries.scala +added fillNext (as fill forward) +added fillPrev (as fill backward) +added fillWithDefault + +TimeSeriesStatisticalTests.scala +added breusch godfrey test +-->need to add RSE (using huber white sandwich) + +add tests + +issues: + + +val src = Source.fromFile("/Users/josecambronero/r_data_ar.csv") +val data = src.getLines().map(_.split(",")).drop(1).map(x => x.map(_.toDouble)).toArray +val Array(x, y, y2) = data.transpose //x, y(stationary), y2(AR(1)) diff --git a/src/.DS_Store b/src/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..826581b9848302a54b0a030682b1525186c82db1 GIT binary patch literal 6148 zcmeHK%}T>S5T4bji3N=ysJGm_c&G~M%~18^MTppgiYBDkK+Q^%+Cwem=^D0*vtd5i-3$PTN-JIjZ~?$ZCA7wI_(Eu%v`>cEV<8IKBWh=7 zQA0*Jf9Z=h4#R+9;IA=2XV-#L2qA&={rQc~8cCLy%gk-?#QZ{;a80{ss_K`ciA=5le(ppm`Cvgo!4ys8S%XUS!(l|Q^ zw6mJTd0lPF4%>EY=g6JubXJ$j3*PEdcfr%{}UBl@+T_Z=rTB{4DbQ5OM_aVsmm~6 z82B**bU%2ggg#@TQC%H4s1^W`ej}8iO}zx=c#J+{p%GV5gib}&se)S!q0`atah%Us zXw>N-xcLzLWx*YaP+!OWJrxeZ*Jwt=fMMVx1H*di()+)KXZV}`|6`Jw83qgk1I2)_ zHiOLymc(ysUvc!-dZ@>!BotR@R4F*{bF2({if2)k;F?DUqR&`pL=TGj5l}Rk!7%Wv G47>uZwxQPm literal 0 HcmV?d00001 diff --git a/src/main/.DS_Store b/src/main/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..c048deca1778a2f54964b38e39f8b88a2d411d9c GIT binary patch literal 6148 zcmeHK%}T>S5ZLnUw?(O*1d%cWv)}CO%r5&S?CdhexYvtT8M7H<1}I|1gytK;anv!XXb&Qna|B_L zU51GW$70FxKN-MpS7RAFX8}vt=kNC(UM6vtHyUreGF7cj&lsj@nbz!_b>eqK;pai# z%edr=UP?hM$URm4P4aN1%?cM#u;RAuB~I6!&>Wr so`IrZT&ZxB0){Nb5R0XF2~-OB9W(%4jio~HfY3!i(m)L{@TUxX0CA>RSpWb4 literal 0 HcmV?d00001 diff --git a/src/main/scala/.DS_Store b/src/main/scala/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..26c2b2d58090713ff1876b3ed59722a6e83f111b GIT binary patch literal 6148 zcmeHK%}T>S5Z-O8CKNP+px$!x(nG9Zy%}OXc@ZM^V5KIe*g(xno7zJyNxK_moLoCFBqm-G>eN%#Z$i}`hFUu zy{;E@POx_vq^(ffM?$p6b`T!N&3?mL*%L_`#7VOwljEofA@y3EM56DBUJ`ZdzH-*$ zC~e6}wZkw>!)#cqgF&V2lZ*1)zgyQD*?*8HN>E%_i%;ATW zvaE3gZ(!&RU;CY;Es_iHjG{+Tgv0` zP#zu7;Pny53y3IS<68pJ*63=?6v7AySE+z1mFp)4SLxulb)2g)Q>fAzmn*|OdS$L( zC|s@%ep`k!?kc2~7$62F8OW=yh4p{`=lB0)67`4yVqm5i;DxGR^ r28x1lnZj`j7_t;YESBOmP$}TI(ExNcW(vUrLKgu^12x3JpEB?X<4jjR literal 0 HcmV?d00001 diff --git a/src/main/scala/com/.DS_Store b/src/main/scala/com/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..db61868fe2726716535841ae21fb0f25f9b7d244 GIT binary patch literal 6148 zcmeHKOH0E*5T0$TCKNP+px$!x(nGDF-V9YwUWAAqRBB?14TP+;sXb^RN1tbZg6J<2 z{4>t%Zi}UQ6Ol3lv)}AIcG)js9{?cg-DnlS0RRh?uwY~JjZmC)ObW(BDAX7cG@%Va z*y~)5WwYZyGC<$Xf^$efLKnV#zc7&@#y-N!B<^I@>bocuOHT`vHLdN1ApZ~(wUCoL^t^N7(n`Gn1I zPYG41HB$Hn&8VMsqV~jW(_si00xyjKeY<5if&@|s(MR8}jBw;O>a>$A_q-c1Gh3RQ zFI%=T*j-o$YGU2QmY{@T`? zwYTp-Me^k5>95~^&dx72Xo35ZvRAD4a)rTI-9LhE+Lq~e9HFc-WY9q-T{wmuGQF!Q zDt9l8%IbhwY>u$hysgKuTu|9)Phz>o!Lh=}u&8{~OalS_Hmu4t6k}>I1Pp;^B|!Uw zi%#mZ7FyNTfsMKXpwe%IGK}diK{+m~&su235j3GwQFW@|D~8bN*zfW@pS94c(?Rg% zL-3mg-%x~lJFf4l=^%WqrZof%frkWU^`k@Q|Hr5A{|}SQlObRTOceo9+6i{*SQ4MD y6UEV4>!Kf`lhM3Ft4hI!k7E_kQT!TR8IHLWp!%$ZR_sADKLQGaX$*n?O5h*JUZx`e literal 0 HcmV?d00001 diff --git a/src/main/scala/com/cloudera/finance/.DS_Store b/src/main/scala/com/cloudera/finance/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..f385f51b9d182727ed3d63ed03dcec53aa72fb66 GIT binary patch literal 6148 zcmeHKUu)Dr5T9+MCOyy;q|~>4>&pqTQLjE3&idqw5U~%f)Px*fFlLWS&J)suJhuLM z_8UmmFC^lJab|Zat=GPZlw${GezUVX8|DYQn*jjPpJaCcd;oA!37r)*=LpS{&dCx- z97Lh_7(oF8sM2XQnpEG;`% zu3mFJuj6&D-{`!EhiV$vNj<6h$#|DWPm_9>vj0q};*X!C&-1}_-@o}tm35MrgRxG| zvjHyI+sVsJP5Ww6X4PJ7f;)Lu5A~)Ta9z*!_Wkvy3B!9`8HSs)u55a}peyg*4`ws} zv*+I02p>L4)xqJ>`wt&K9e*=q3%r}OtTLX!cXZ}ze;bd>LY1#@j`AEBLk<>**Vr&hX zbP(Kp2>!C*4n^p%J&8m9BYT3;&oIlI48+Lj2K&k7(uZ=0-6SEECc_Qf#2;z Bq<#PZ literal 0 HcmV?d00001 diff --git a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala index 94269d41..47aad13c 100644 --- a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala +++ b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala @@ -49,6 +49,12 @@ object EasyPlot { def ezplot(vecs: Seq[Vector[Double]]): Unit = ezplot(vecs, '-') + /** + * Autocorrelation function plot + * @param data array of data to analyze + * @param maxLag maximum lag for autocorrelation + * @param conf confidence bounds to display + */ def acfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { // calculate correlations and confidence bound val autoCorrs = UnivariateTimeSeries.autocorr(data, maxLag) @@ -63,6 +69,12 @@ object EasyPlot { drawCorrPlot(autoCorrs, confVal, p) } + /** + * Partial autocorrelation function plot + * @param data array of data to analyze + * @param maxLag maximum lag for partial autocorrelation function + * @param conf confidence bounds to display + */ def pacfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { // create AR(maxLag) model, retrieve coefficients and calculate confidence bound val model = Autoregression.fitModel(new DenseVector(data), maxLag) diff --git a/src/test/.DS_Store b/src/test/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..c048deca1778a2f54964b38e39f8b88a2d411d9c GIT binary patch literal 6148 zcmeHK%}T>S5ZLnUw?(O*1d%cWv)}CO%r5&S?CdhexYvtT8M7H<1}I|1gytK;anv!XXb&Qna|B_L zU51GW$70FxKN-MpS7RAFX8}vt=kNC(UM6vtHyUreGF7cj&lsj@nbz!_b>eqK;pai# z%edr=UP?hM$URm4P4aN1%?cM#u;RAuB~I6!&>Wr so`IrZT&ZxB0){Nb5R0XF2~-OB9W(%4jio~HfY3!i(m)L{@TUxX0CA>RSpWb4 literal 0 HcmV?d00001 diff --git a/src/test/scala/.DS_Store b/src/test/scala/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..26c2b2d58090713ff1876b3ed59722a6e83f111b GIT binary patch literal 6148 zcmeHK%}T>S5Z-O8CKNP+px$!x(nG9Zy%}OXc@ZM^V5KIe*g(xno7zJyNxK_moLoCFBqm-G>eN%#Z$i}`hFUu zy{;E@POx_vq^(ffM?$p6b`T!N&3?mL*%L_`#7VOwljEofA@y3EM56DBUJ`ZdzH-*$ zC~e6}wZkw>!)#cqgF&V2lZ*1)zgyQD*?*8HN>E%_i%;ATW zvaE3gZ(!&RU;CY;Es_iHjG{+Tgv0` zP#zu7;Pny53y3IS<68pJ*63=?6v7AySE+z1mFp)4SLxulb)2g)Q>fAzmn*|OdS$L( zC|s@%ep`k!?kc2~7$62F8OW=yh4p{`=lB0)67`4yVqm5i;DxGR^ r28x1lnZj`j7_t;YESBOmP$}TI(ExNcW(vUrLKgu^12x3JpEB?X<4jjR literal 0 HcmV?d00001 diff --git a/src/test/scala/com/.DS_Store b/src/test/scala/com/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..db61868fe2726716535841ae21fb0f25f9b7d244 GIT binary patch literal 6148 zcmeHKOH0E*5T0$TCKNP+px$!x(nGDF-V9YwUWAAqRBB?14TP+;sXb^RN1tbZg6J<2 z{4>t%Zi}UQ6Ol3lv)}AIcG)js9{?cg-DnlS0RRh?uwY~JjZmC)ObW(BDAX7cG@%Va z*y~)5WwYZyGC<$Xf^$efLKnV#zc7&@#y-N!B<^I@>bocuOHN3q*gC z{u^g@x1v^0f=HQx*>83pyX=>+4*(F=UbGJ20Dy%`Sg^7AMkr1?CI#am6l#of$RG{- zX(w!tWwYZyGC<#M7EtM*+6rvygnQ{54N?ky1Bg<%FC Date: Sun, 28 Jun 2015 15:35:52 -0700 Subject: [PATCH 24/42] added scaladocs to acf/pacf --- src/main/scala/com/cloudera/sparkts/EasyPlot.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala index 94269d41..47aad13c 100644 --- a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala +++ b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala @@ -49,6 +49,12 @@ object EasyPlot { def ezplot(vecs: Seq[Vector[Double]]): Unit = ezplot(vecs, '-') + /** + * Autocorrelation function plot + * @param data array of data to analyze + * @param maxLag maximum lag for autocorrelation + * @param conf confidence bounds to display + */ def acfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { // calculate correlations and confidence bound val autoCorrs = UnivariateTimeSeries.autocorr(data, maxLag) @@ -63,6 +69,12 @@ object EasyPlot { drawCorrPlot(autoCorrs, confVal, p) } + /** + * Partial autocorrelation function plot + * @param data array of data to analyze + * @param maxLag maximum lag for partial autocorrelation function + * @param conf confidence bounds to display + */ def pacfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { // create AR(maxLag) model, retrieve coefficients and calculate confidence bound val model = Autoregression.fitModel(new DenseVector(data), maxLag) From 315ff273ba981396b60a5e4ec4a9faa91ecee1c5 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Mon, 29 Jun 2015 18:10:14 -0700 Subject: [PATCH 25/42] changed up/downSample function names to up/downsample (along with relevant tests), added missing type to EasyPlot functions --- src/main/scala/com/cloudera/sparkts/EasyPlot.scala | 5 ++--- .../com/cloudera/sparkts/UnivariateTimeSeries.scala | 4 ++-- .../cloudera/sparkts/UnivariateTimeSeriesSuite.scala | 12 ++++++------ 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala index 47aad13c..d95ed63a 100644 --- a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala +++ b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala @@ -20,7 +20,6 @@ import breeze.plot._ import org.apache.commons.math3.distribution.NormalDistribution - object EasyPlot { def ezplot(vec: Vector[Double], style: Char): Unit = { val f = Figure() @@ -90,13 +89,13 @@ object EasyPlot { drawCorrPlot(pCorrs, confVal, p) } - private[sparkts] def calcConfVal(conf:Double, n: Int) = { + private[sparkts] def calcConfVal(conf:Double, n: Int): Double = { val stdNormDist = new NormalDistribution(0, 1) val pVal = (1 - conf) / 2.0 stdNormDist.inverseCumulativeProbability(1 - pVal) / Math.sqrt(n) } - private[sparkts] def drawCorrPlot(corrs: Array[Double], confVal: Double, p: Plot) = { + private[sparkts] def drawCorrPlot(corrs: Array[Double], confVal: Double, p: Plot): Unit = { // make decimal ticks visible p.setYAxisDecimalTickUnits() // plot correlations as vertical lines diff --git a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala index 551d7378..27bcacf8 100644 --- a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala +++ b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala @@ -288,7 +288,7 @@ object UnivariateTimeSeries { * @param phase * @return */ - def downSample(values: Vector[Double], n: Int, phase: Int = 0) = { + def downsample(values: Vector[Double], n: Int, phase: Int = 0) = { val origLen = values.length val newLen = Math.ceil((values.length - phase) / n.toDouble).toInt val sampledValues = new DenseVector(Array.fill(newLen)(0.0)) @@ -311,7 +311,7 @@ object UnivariateTimeSeries { * @param useZero * @return */ - def upSample(values: Vector[Double], n: Int, phase: Int = 0, useZero: Boolean = false) = { + def upsample(values: Vector[Double], n: Int, phase: Int = 0, useZero: Boolean = false) = { val filler = if (useZero) 0 else Double.NaN val origLen = values.length val newLen = origLen * n diff --git a/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala b/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala index 46423b2b..29cd26cc 100644 --- a/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/UnivariateTimeSeriesSuite.scala @@ -51,10 +51,10 @@ class UnivariateTimeSeriesSuite extends FunSuite with ShouldMatchers { // replicating upsampling examples // from http://www.mathworks.com/help/signal/ref/upsample.html?searchHighlight=upsample val y = new DenseVector(Array(1.0, 2.0, 3.0, 4.0)) - val yUp1 = upSample(y, 3, useZero = true).toArray + val yUp1 = upsample(y, 3, useZero = true).toArray yUp1 should be (Array(1.0, 0.0, 0.0, 2.0, 0.0, 0.0, 3.0, 0.0, 0.0, 4.0, 0.0, 0.0)) - val yUp2 = upSample(y, 3, useZero = true, phase = 2).toArray + val yUp2 = upsample(y, 3, useZero = true, phase = 2).toArray yUp2 should be (Array(0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 0.0, 3.0, 0.0, 0.0, 4.0)) } @@ -62,10 +62,10 @@ class UnivariateTimeSeriesSuite extends FunSuite with ShouldMatchers { // replicating downsampling examples // from http://www.mathworks.com/help/signal/ref/downsample.html?searchHighlight=downsample val y = new DenseVector((1 to 10).toArray.map(_.toDouble)) - val yDown1 = downSample(y, 3).toArray + val yDown1 = downsample(y, 3).toArray yDown1 should be (Array(1.0, 4.0, 7.0, 10.0)) - val yDown2 = downSample(y, 3, phase = 2).toArray + val yDown2 = downsample(y, 3, phase = 2).toArray yDown2 should be (Array(3.0, 6.0, 9.0)) } @@ -78,8 +78,8 @@ class UnivariateTimeSeriesSuite extends FunSuite with ShouldMatchers { val y = (1 to 1000).toArray.map(_.toDouble / 100.0).map(Math.sin(_)) val vy = new DenseVector(y) - val lessFreq = downSample(vy, 100) - val moreFreq = upSample(lessFreq, 100) + val lessFreq = downsample(vy, 100) + val moreFreq = upsample(lessFreq, 100) // work on copies val splineY = fillSpline(new DenseVector(moreFreq.toArray)).toArray From 87418e8183ad7b202ed6f15adfef9c8d9394bd24 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Wed, 1 Jul 2015 12:23:00 -0700 Subject: [PATCH 26/42] added scaladoc comments for up/down sampling --- .../cloudera/sparkts/UnivariateTimeSeries.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala index 27bcacf8..9b3024bc 100644 --- a/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala +++ b/src/main/scala/com/cloudera/sparkts/UnivariateTimeSeries.scala @@ -253,7 +253,7 @@ object UnivariateTimeSeries { } /** - * Fill in NA values using a natural cubic spline. + * Fill in NaN values using a natural cubic spline. * @param values Vector to interpolate * @return Interpolated vector */ @@ -283,10 +283,10 @@ object UnivariateTimeSeries { /** * Down sample by taking every nth element starting from offset phase - * @param values - * @param n - * @param phase - * @return + * @param values Vector to down sample + * @param n take every nth element + * @param phase offset from starting index + * @return downsampled vector with appropriate length */ def downsample(values: Vector[Double], n: Int, phase: Int = 0) = { val origLen = values.length @@ -308,8 +308,8 @@ object UnivariateTimeSeries { * @param values the original data vector * @param n the number of insertions between elements * @param phase the offset to begin - * @param useZero - * @return + * @param useZero fill with zeros rather than NaN + * @return upsampled vector filled with zeros or NaN, as specified by user */ def upsample(values: Vector[Double], n: Int, phase: Int = 0, useZero: Boolean = false) = { val filler = if (useZero) 0 else Double.NaN From bec7089e3dc8196ac989ed99ea8f617fe344b0bf Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Thu, 9 Jul 2015 22:24:32 -0400 Subject: [PATCH 27/42] Renamed labels to keys --- .../HistoricalValueAtRiskExample.scala | 4 ++-- .../com/cloudera/sparkts/TimeSeries.scala | 22 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala b/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala index f9030d03..5765b464 100644 --- a/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala +++ b/src/main/scala/com/cloudera/finance/examples/HistoricalValueAtRiskExample.scala @@ -67,7 +67,7 @@ object HistoricalValueAtRiskExample { // Fit an AR(1) + GARCH(1, 1) model to each factor val garchModels = factorReturns.mapValues(ARGARCH.fitModel(_)).toMap - val iidFactorReturns = factorReturns.mapSeriesWithLabel { case (symbol, series) => + val iidFactorReturns = factorReturns.mapSeriesWithKey { case (symbol, series) => val model = garchModels(symbol) model.removeTimeDependentEffects(series, DenseVector.zeros[Double](series.length)) } @@ -107,7 +107,7 @@ object HistoricalValueAtRiskExample { mat(i, ::) := iidSeries.data(rand.nextInt(iidSeries.data.rows), ::) } (0 until models.size).foreach { i => - models(iidSeries.labels(i)).addTimeDependentEffects(mat(::, i), mat(::, i)) + models(iidSeries.keys(i)).addTimeDependentEffects(mat(::, i), mat(::, i)) } mat } diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeries.scala b/src/main/scala/com/cloudera/sparkts/TimeSeries.scala index 3be205ed..981d771b 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeries.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeries.scala @@ -20,17 +20,17 @@ import breeze.linalg._ import com.github.nscala_time.time.Imports._ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], - val labels: Array[String]) extends Serializable { + val keys: Array[String]) extends Serializable { def slice(range: Range): TimeSeries = { - new TimeSeries(index.slice(range), data(range, ::), labels) + new TimeSeries(index.slice(range), data(range, ::), keys) } def union(vec: Vector[Double], key: String): TimeSeries = { val mat = DenseMatrix.zeros[Double](data.rows, data.cols + 1) (0 until data.cols).foreach(c => mat(::, c to c) := data(::, c to c)) mat(::, -1 to -1) := vec - new TimeSeries(index, mat, labels :+ key) + new TimeSeries(index, mat, keys :+ key) } /** @@ -86,7 +86,7 @@ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], def hasNext: Boolean = i < data.cols def next(): (String, Vector[Double]) = { i += 1 - (labels(i - 1), data(::, i - 1)) + (keys(i - 1), data(::, i - 1)) } } } @@ -99,15 +99,15 @@ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], } /** - * Applies a transformation to each series that preserves the time index. Passes the label along + * Applies a transformation to each series that preserves the time index. Passes the key along * with each series. */ - def mapSeriesWithLabel(f: (String, Vector[Double]) => Vector[Double]): TimeSeries = { + def mapSeriesWithKey(f: (String, Vector[Double]) => Vector[Double]): TimeSeries = { val newData = new DenseMatrix[Double](index.size, data.cols) univariateKeyAndSeriesIterator().zipWithIndex.foreach { case ((key, series), index) => newData(::, index) := f(key, series) } - new TimeSeries(index, newData, labels) + new TimeSeries(index, newData, keys) } /** @@ -120,7 +120,7 @@ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], univariateSeriesIterator().zipWithIndex.foreach { case (vec, index) => newData(::, index) := f(vec) } - new TimeSeries(newIndex, newData, labels) + new TimeSeries(newIndex, newData, keys) } def mapValues[U](f: (Vector[Double]) => U): Seq[(String, U)] = { @@ -128,13 +128,13 @@ class TimeSeries(val index: DateTimeIndex, val data: DenseMatrix[Double], } /** - * Gets the first univariate series and its label. + * Gets the first univariate series and its key. */ def head(): (String, Vector[Double]) = univariateKeyAndSeriesIterator().next } object TimeSeries { - def timeSeriesFromSamples(samples: Seq[(DateTime, Array[Double])], labels: Array[String]) + def timeSeriesFromSamples(samples: Seq[(DateTime, Array[Double])], keys: Array[String]) : TimeSeries = { val mat = new DenseMatrix[Double](samples.length, samples.head._2.length) val dts = new Array[Long](samples.length) @@ -143,7 +143,7 @@ object TimeSeries { dts(i) = dt.getMillis mat(i to i, ::) := new DenseVector[Double](values) } - new TimeSeries(new IrregularDateTimeIndex(dts), mat, labels) + new TimeSeries(new IrregularDateTimeIndex(dts), mat, keys) } } From 1684baf6f853420caa12a265d8cfd1973b8662ea Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Tue, 14 Jul 2015 22:40:56 -0400 Subject: [PATCH 28/42] Added scalastyle plugin config --- pom.xml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pom.xml b/pom.xml index ec9366fd..359293f4 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,32 @@ true + + + + org.scalastyle + scalastyle-maven-plugin + 0.7.0 + + false + false + true + true + ${basedir}/src/main/scala + ${basedir}/src/test/scala + ${basedir}/lib/scalastyle_config.xml + ${project.basedir}/scalastyle-output.xml + UTF-8 + + + + + check + + + + + org.scalatest From 84923bf387ac9a4882ab9ee0f4a710483fc04f34 Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Tue, 14 Jul 2015 22:55:18 -0400 Subject: [PATCH 29/42] Gave maven-surefire-plugin a version since Maven was cranky about it not having one --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index 359293f4..eb11ea71 100644 --- a/pom.xml +++ b/pom.xml @@ -74,10 +74,12 @@ + org.apache.maven.plugins maven-surefire-plugin + 2.12 true From 67f90dd2ca4887eaebcc0f62700fd5c074fed99c Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Wed, 15 Jul 2015 10:44:23 -0400 Subject: [PATCH 30/42] Ignoring scalastyle-output.xml --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index d5149cea..cc1e7d75 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ target *.iml .idea - +scalastyle-output.xml From e096f805bee69fec4a160e2d062251b2be628aa4 Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Wed, 15 Jul 2015 10:45:12 -0400 Subject: [PATCH 31/42] Scalastyle now runs as part of mvn compile --- pom.xml | 3 +- scalastyle-config.xml | 142 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 scalastyle-config.xml diff --git a/pom.xml b/pom.xml index eb11ea71..863562a8 100644 --- a/pom.xml +++ b/pom.xml @@ -97,12 +97,13 @@ true ${basedir}/src/main/scala ${basedir}/src/test/scala - ${basedir}/lib/scalastyle_config.xml + ${basedir}/scalastyle-config.xml ${project.basedir}/scalastyle-output.xml UTF-8 + compile check diff --git a/scalastyle-config.xml b/scalastyle-config.xml new file mode 100644 index 00000000..42ef15ff --- /dev/null +++ b/scalastyle-config.xml @@ -0,0 +1,142 @@ + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From a0a7bd6c66d12bba986a71c0c7ba81bccf272578 Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Wed, 15 Jul 2015 14:04:50 -0400 Subject: [PATCH 32/42] Added notes about how to disable scalastyle when needed, also gave the config xml the indentation formatting IntelliJ craved --- scalastyle-config.xml | 259 ++++++++++++++++++++++-------------------- 1 file changed, 133 insertions(+), 126 deletions(-) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 42ef15ff..7c05ea15 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -1,14 +1,21 @@ + + + + + + - Scalastyle standard configuration - - - - - - - - - Scalastyle standard configuration + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From 4f9ac31299e5285032ba976b4fba73011e253718 Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Wed, 15 Jul 2015 15:12:54 -0400 Subject: [PATCH 33/42] Adding notes about how to switch off checking for a specific rule --- scalastyle-config.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 7c05ea15..1f1ed0f5 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -5,6 +5,14 @@ + + + Scalastyle standard configuration From fefaaa058a9ac3a6213fe05e6c595e1a04420c2e Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Wed, 15 Jul 2015 15:25:50 -0400 Subject: [PATCH 34/42] Assignment to val is not considered to be a magic number --- scalastyle-config.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 1f1ed0f5..94deff02 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -10,7 +10,7 @@ IDs and such can be found here: http://www.scalastyle.org/rules-0.7.0.html --> From 95529d0965da4896dc3d454ae11127e5e25795b9 Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Thu, 16 Jul 2015 19:29:35 -0400 Subject: [PATCH 35/42] Added the Cloudera file header --- scalastyle-config.xml | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 94deff02..0dc52c8c 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -23,21 +23,20 @@ var notAtAllAMagicNumber = 1234 - + From f6842181b504a02ce6096c0d7a47e77716b94c0e Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Tue, 21 Jul 2015 19:02:57 -0400 Subject: [PATCH 36/42] Added newline at end of file --- scalastyle-config.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 0dc52c8c..3abf027e 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -153,4 +153,4 @@ var notAtAllAMagicNumber = 1234 - \ No newline at end of file + From 73f4d35cac983eca5481ed06726955fcd25983e1 Mon Sep 17 00:00:00 2001 From: Chris Dalzell Date: Fri, 24 Jul 2015 18:50:23 -0400 Subject: [PATCH 37/42] Disabled a couple of checks --- scalastyle-config.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 3abf027e..3aa09f33 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -74,11 +74,13 @@ var notAtAllAMagicNumber = 1234 + @@ -144,7 +146,9 @@ var notAtAllAMagicNumber = 1234 + From 09fdd401101cbbfe632f85850a3136c1acbca1df Mon Sep 17 00:00:00 2001 From: Jose Cambronero Date: Sun, 2 Aug 2015 21:18:58 -0700 Subject: [PATCH 38/42] added ljunbox test, commonly used with arima to check residuals for serial corr --- .../sparkts/TimeSeriesStatisticalTests.scala | 22 ++++++++++++++++++- .../TimeSeriesStatisticalTestsSuite.scala | 16 ++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index c22f7938..1d2721fa 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -272,7 +272,27 @@ object TimeSeriesStatisticalTests { val bgstat = nObs * auxOLS.calculateRSquared() (bgstat, 1 - new ChiSquaredDistribution(maxLag).cumulativeProbability(bgstat)) } - + + /** + * Ljung-Box test for serial correlation in residuals up to lag `maxLag`. The null hypothesis + * is that values are independently distributed up to the given lag. The alternate hypothesis + * is that serial correlation is present. The test statistic follows a Chi-Squared distribution + * with `maxLag` degrees of freedom. See [[https://en.wikipedia.org/wiki/Ljung%E2%80%93Box_test]] + * for more information. + * @param residuals + * @return the test statistic and the p-value associated with it. + */ + def lbtest(residuals: Vector[Double], maxLag: Int): (Double, Double) = { + val autoCorrs = UnivariateTimeSeries.autocorr(residuals, maxLag) + val n = residuals.length + val adjAutoCorrs = autoCorrs.toArray.zipWithIndex.map { case (p, k) => + (p * p) / (n - k - 1) + } + val testStatistic = n * (n + 2) * adjAutoCorrs.sum + val pValue = 1 - new ChiSquaredDistribution(maxLag).cumulativeProbability(testStatistic) + (testStatistic, pValue) + } + /** * Breusch-Pagan test for heteroskedasticity in a model * The statistic follows a X^2 distribution with (# of regressors - 1) degrees of freedom diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala index 698e4f43..d80b6edb 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala @@ -86,4 +86,20 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { // there should be evidence of heteroskedasticity bptest(new DenseVector(resids2), new DenseMatrix(x.length, 1, x))._2 should be < pthreshold } + + test("ljung-box test") { + val rand = new MersenneTwister(5L) + val n = 100 + val indep = Array.fill(n)(rand.nextGaussian) + val vecIndep = new DenseVector(indep) + val (stat1, pval1) = lbtest(vecIndep, 1) + pval1 > 0.05 + + // serially correlated + val coef = 0.3 + val dep = indep.scanLeft(0.0) { case (prior, curr) => prior * coef + curr }.tail + val vecDep = new DenseVector(dep) + val (stat2, pval2) = lbtest(vecDep, 2) + pval2 < 0.05 + } } From a5cdfa4504cb968a158ec98c5445082024d11ab1 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Tue, 4 Aug 2015 16:20:40 -0700 Subject: [PATCH 39/42] modified easyplot functions to return figure, so that user can 'saveas' --- .../scala/com/cloudera/sparkts/EasyPlot.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala index d95ed63a..6a9a8c8c 100644 --- a/src/main/scala/com/cloudera/sparkts/EasyPlot.scala +++ b/src/main/scala/com/cloudera/sparkts/EasyPlot.scala @@ -21,32 +21,35 @@ import breeze.plot._ import org.apache.commons.math3.distribution.NormalDistribution object EasyPlot { - def ezplot(vec: Vector[Double], style: Char): Unit = { + def ezplot(vec: Vector[Double], style: Char): Figure = { val f = Figure() val p = f.subplot(0) p += plot((0 until vec.length).map(_.toDouble).toArray, vec, style = style) + f } - def ezplot(vec: Vector[Double]): Unit = ezplot(vec, '-') + def ezplot(vec: Vector[Double]): Figure = ezplot(vec, '-') - def ezplot(arr: Array[Double], style: Char): Unit = { + def ezplot(arr: Array[Double], style: Char): Figure = { val f = Figure() val p = f.subplot(0) p += plot(arr.indices.map(_.toDouble).toArray, arr, style = style) + f } - def ezplot(arr: Array[Double]): Unit = ezplot(arr, '-') + def ezplot(arr: Array[Double]): Figure = ezplot(arr, '-') - def ezplot(vecs: Seq[Vector[Double]], style: Char): Unit = { + def ezplot(vecs: Seq[Vector[Double]], style: Char): Figure = { val f = Figure() val p = f.subplot(0) val first = vecs.head vecs.foreach { vec => p += plot((0 until first.length).map(_.toDouble).toArray, vec, style) } + f } - def ezplot(vecs: Seq[Vector[Double]]): Unit = ezplot(vecs, '-') + def ezplot(vecs: Seq[Vector[Double]]): Figure = ezplot(vecs, '-') /** * Autocorrelation function plot @@ -54,7 +57,7 @@ object EasyPlot { * @param maxLag maximum lag for autocorrelation * @param conf confidence bounds to display */ - def acfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { + def acfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Figure = { // calculate correlations and confidence bound val autoCorrs = UnivariateTimeSeries.autocorr(data, maxLag) val confVal = calcConfVal(conf, data.length) @@ -66,6 +69,7 @@ object EasyPlot { p.xlabel = "Lag" p.ylabel = "Autocorrelation" drawCorrPlot(autoCorrs, confVal, p) + f } /** @@ -74,7 +78,7 @@ object EasyPlot { * @param maxLag maximum lag for partial autocorrelation function * @param conf confidence bounds to display */ - def pacfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = { + def pacfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Figure = { // create AR(maxLag) model, retrieve coefficients and calculate confidence bound val model = Autoregression.fitModel(new DenseVector(data), maxLag) val pCorrs = model.coefficients // partial autocorrelations are the coefficients in AR(n) model @@ -87,6 +91,7 @@ object EasyPlot { p.xlabel = "Lag" p.ylabel = "Partial Autocorrelation" drawCorrPlot(pCorrs, confVal, p) + f } private[sparkts] def calcConfVal(conf:Double, n: Int): Double = { From c2405ab57616edfdfea2fd321600d206c2c0948a Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Fri, 7 Aug 2015 16:53:35 -0700 Subject: [PATCH 40/42] added kpss stationarity test --- .../sparkts/TimeSeriesStatisticalTests.scala | 104 +++++++++++++++++- .../TimeSeriesStatisticalTestsSuite.scala | 42 +++++++ 2 files changed, 145 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index 1d2721fa..db939fde 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -22,6 +22,8 @@ import com.cloudera.finance.Util import org.apache.commons.math3.distribution.{ChiSquaredDistribution, NormalDistribution} import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression +import scala.collection.immutable.ListMap + /** * Adapted from statsmodels: * https://github.com/statsmodels/statsmodels/blob/master/statsmodels/tsa/stattools.py @@ -312,5 +314,105 @@ object TimeSeriesStatisticalTests { val bpstat = residuals.length * auxOLS.calculateRSquared() val df = factors.cols // auxOLS uses intercept term, so (# of regressors - 1) = # factors cols (bpstat, 1 - new ChiSquaredDistribution(df).cumulativeProbability(bpstat)) - } + } + + /** + * Critical values associated with KPSS test when testing with a simple constant + * Source: + * Kwiatkowski, D., Phillips, P., Schmidt, P., & Shin, Y. + * Testing the null hypothesis of stationarity against the alternative of a unit root. + * Journal of Econometrics, 159-178. + */ + private val kpssConstantCriticalValues = ListMap(0.10 -> 0.347, 0.05 -> 0.463, 0.025 -> 0.574, + 0.01 -> 0.739 + ) + + /** + * Critical values associated with KPSS test when testing with a constant and time trend + * Source: + * Kwiatkowski, D., Phillips, P., Schmidt, P., & Shin, Y. + * Testing the null hypothesis of stationarity against the alternative of a unit root. + * Journal of Econometrics, 159-178. + */ + private val kpssConstantAndTrendCriticalValues = ListMap(0.10 -> 0.119, 0.05 -> 0.146, + 0.025 -> 0.176, 0.01 -> 0.216 + ) + + /** + * Performs the KPSS stationarity test (Kwiatkowski, Phillips, Schmidt and Shin). The null + * hypothesis corresponds to stationarity (level stationarity if method = "c", or trend + * stationarity if method = "ct"). If method = "c", a regression of the form ts_i = alpha + + * error_i is fit. If method = "ct", a regression of the form ts_i = alpha + beta * i + error_i. + * The test then performs a check for the variance of the errors. The null hypothesis of + * stationarity corresponds to a null hypothesis of variance = 0 for the errors. + * For more information please see pg 129 of [[http://faculty.washington + * .edu/ezivot/econ584/notes/unitroot.pdf]]. For the original paper on the test, please see + * [[http://www.deu.edu.tr/userweb/onder.hanedar/dosyalar/kpss.pdf]]. + * Finally, the current implementation follows R's tseries package implementation closely, which + * can be found at [[https://cran.r-project.org/web/packages/tseries/index.html]] + * @param ts time series to test for stationarity + * @param method "c" or "ct", short for fitting with a constant or a constant and a time trend + * @return the KPSS statistic and a map of critical values according to the method selected + */ + def kpsstest(ts: Vector[Double], method: String): (Double, Map[Double, Double]) = { + require(List("c", "ct").contains(method), "trend must be c or ct") + val n = ts.length + // constant + val c = Array.fill(n)(1.0) + // create appropriate regression matrix and obtain critical values + val (regressors, criticalValues) = if (method == "c") { + (c.map(Array(_)), kpssConstantCriticalValues) + } else { + // time trend + val t = Array.tabulate(n)(x => 1.0 + x.toDouble) + (Array(c, t).transpose, kpssConstantAndTrendCriticalValues) + } + val ols = new OLSMultipleLinearRegression() + ols.setNoIntercept(true) + ols.newSampleData(ts.toArray, regressors) + val residuals = ols.estimateResiduals() + // sum of (square of cumulative sum of errors) + val s2 = residuals.scanLeft(0.0)(_ + _).tail.map(math.pow(_, 2)).sum + // long run variance estimate using Newey-West estimator + // we follow the default lag used in kpss.test in R's tseries package + val lag = (3 * math.sqrt(n) / 13).toInt + val longRunVariance = neweyWestVarianceEstimator(residuals, lag) + val stat = (s2 / longRunVariance) / (n * n) + (stat, criticalValues) + } + + /** + * Estimates long run variance using the Newey-West estimator, which consists on calculating + * the variance between lagged values and applying a weight that decreases as the lag increases. + * We translate the C implementation used by R's tseries:kpss.test, found at + * https://github.com/cran/tseries/blob/master/src/ppsum.c + * However, we add our own comments for clarity. + */ + private def neweyWestVarianceEstimator(errors: Array[Double], lag: Int): Double = { + val n = errors.length + var sumOfTerms = 0.0 + var cellContrib = 0.0 + var i = 0 + var j = 0 + + i = 1 + while (i <= lag) { + j = i + cellContrib = 0.0 + while (j < n) { + // covariance between values at time t and time t-i + cellContrib += errors(j) * errors(j - i) + j += 1 + } + // Newey-West weighing (decreasing weight for longer lags) + sumOfTerms += cellContrib * (1 - (i.toDouble / (lag + 1))) + i += 1 + } + // we multiply by 2 as we calculated the sum of top row in matrix (lag 0, lag 1), (lag 0, lag 2) + // etc but we need to also calculate first column (lag 1, lag 0), (lag 2, lag 0) etc + // We divide by n and obtain a partial estimate of the variance, just missing no lag variance + val partialEstVar = (sumOfTerms * 2) / n + // add no-lag variance + partialEstVar + (errors.map(math.pow(_, 2)).sum / n) + } } diff --git a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala index d80b6edb..72063d4c 100644 --- a/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala +++ b/src/test/scala/com/cloudera/sparkts/TimeSeriesStatisticalTestsSuite.scala @@ -102,4 +102,46 @@ class TimeSeriesStatisticalTestsSuite extends FunSuite with ShouldMatchers { val (stat2, pval2) = lbtest(vecDep, 2) pval2 < 0.05 } + + test("KPSS test: R equivalence") { + // Note that we only test the statistic, as in contrast with the R tseries implementation + // we do not calculate a p-value, but rather return a map of appropriate critical values + // R's tseries linearly interpolates between critical values. + + // version.string R version 3.2.0 (2015-04-16) + // > set.seed(10) + // > library(tseries) + // > v <- rnorm(20) + // > kpss.test(v, "Level") + // + // KPSS Test for Level Stationarity + // + // data: v + // KPSS Level = 0.27596, Truncation lag parameter = 1, p-value = 0.1 + // + // Warning message: + // In kpss.test(v, "Level") : p-value greater than printed p-value + // > kpss.test(v, "Trend") + // + // KPSS Test for Trend Stationarity + // + // data: v + // KPSS Trend = 0.05092, Truncation lag parameter = 1, p-value = 0.1 + // + // Warning message: + // In kpss.test(v, "Trend") : p-value greater than printed p-value + + val arr = Array(0.0187461709418264, -0.184252542069064, -1.37133054992251, -0.599167715783718, + 0.294545126567508, 0.389794300700167, -1.20807617542949, -0.363676017470862, + -1.62667268170309, -0.256478394123992, 1.10177950308713, 0.755781508027337, + -0.238233556018718, 0.98744470341339, 0.741390128383824, 0.0893472664958216, + -0.954943856152377, -0.195150384667239, 0.92552126209408, 0.482978524836611 + ) + val dv = new DenseVector(arr) + val cTest = kpsstest(dv, "c")._1 + val ctTest = kpsstest(dv, "ct")._1 + + cTest should be (0.2759 +- 1e-4) + ctTest should be (0.05092 +- 1e-4) + } } From 8b5c18813cd20a6ab5b6101e88f90c6f0e877d23 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Mon, 10 Aug 2015 14:03:37 -0700 Subject: [PATCH 41/42] added source for newey west --- .../scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index db939fde..278649b6 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -387,6 +387,8 @@ object TimeSeriesStatisticalTests { * We translate the C implementation used by R's tseries:kpss.test, found at * https://github.com/cran/tseries/blob/master/src/ppsum.c * However, we add our own comments for clarity. + * See pg 87 of [[http://faculty.washington.edu/ezivot/econ584/notes/timeSeriesConcepts.pdf]] for + * more information */ private def neweyWestVarianceEstimator(errors: Array[Double], lag: Int): Double = { val n = errors.length From 5b3a8f08fb23594770bf077fcd8f4593f67cf61a Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Tue, 18 Aug 2015 18:22:49 -0700 Subject: [PATCH 42/42] fixed spacing for critical values in kpss and style fix for capitalization in comment --- .../cloudera/sparkts/TimeSeriesStatisticalTests.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala index 278649b6..0b03faa0 100644 --- a/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala +++ b/src/main/scala/com/cloudera/sparkts/TimeSeriesStatisticalTests.scala @@ -323,8 +323,8 @@ object TimeSeriesStatisticalTests { * Testing the null hypothesis of stationarity against the alternative of a unit root. * Journal of Econometrics, 159-178. */ - private val kpssConstantCriticalValues = ListMap(0.10 -> 0.347, 0.05 -> 0.463, 0.025 -> 0.574, - 0.01 -> 0.739 + private val kpssConstantCriticalValues = ListMap( + 0.10 -> 0.347, 0.05 -> 0.463, 0.025 -> 0.574, 0.01 -> 0.739 ) /** @@ -334,8 +334,8 @@ object TimeSeriesStatisticalTests { * Testing the null hypothesis of stationarity against the alternative of a unit root. * Journal of Econometrics, 159-178. */ - private val kpssConstantAndTrendCriticalValues = ListMap(0.10 -> 0.119, 0.05 -> 0.146, - 0.025 -> 0.176, 0.01 -> 0.216 + private val kpssConstantAndTrendCriticalValues = ListMap( + 0.10 -> 0.119, 0.05 -> 0.146, 0.025 -> 0.176, 0.01 -> 0.216 ) /** @@ -412,7 +412,7 @@ object TimeSeriesStatisticalTests { } // we multiply by 2 as we calculated the sum of top row in matrix (lag 0, lag 1), (lag 0, lag 2) // etc but we need to also calculate first column (lag 1, lag 0), (lag 2, lag 0) etc - // We divide by n and obtain a partial estimate of the variance, just missing no lag variance + // we divide by n and obtain a partial estimate of the variance, just missing no lag variance val partialEstVar = (sumOfTerms * 2) / n // add no-lag variance partialEstVar + (errors.map(math.pow(_, 2)).sum / n)