Skip to content

Commit

Permalink
exposing anomaly thresholds, refactored anomaly strategy to output al…
Browse files Browse the repository at this point in the history
…l data points with anomaly detection details, passed that through the constraint into the constraint result as anomaly detection metadata field
  • Loading branch information
Hubert committed Dec 7, 2023
1 parent bff1d91 commit 847d522
Show file tree
Hide file tree
Showing 25 changed files with 1,019 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Calendar

import com.amazon.deequ.analyzers.{Analyzer, State}
import com.amazon.deequ.checks.Check
import com.amazon.deequ.constraints.{AnalysisBasedConstraint, Constraint, ConstraintDecorator}
import com.amazon.deequ.constraints.{AnalysisBasedConstraint, AnomalyBasedConstraint, Constraint, ConstraintDecorator}
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Expand Down Expand Up @@ -187,9 +187,13 @@ private[deequ] class Applicability(session: SparkSession) {
case (name, nc: ConstraintDecorator) => name -> nc.inner
case (name, c: Constraint) => name -> c
}
.collect { case (name, constraint: AnalysisBasedConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
.collect {
case (name, constraint: AnalysisBasedConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
case (name, constraint: AnomalyBasedConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
}

val constraintApplicabilities = check.constraints.zip(namedMetrics).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ trait AnomalyDetectionStrategy {
*/
def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int) = (0, Int.MaxValue)): Seq[(Int, Anomaly)]
searchInterval: (Int, Int) = (0, Int.MaxValue)): Seq[(Int, AnomalyDetectionDataPoint)]
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
def isNewPointAnomalous(
historicalDataPoints: Seq[DataPoint[Double]],
newPoint: DataPoint[Double])
: DetectionResult = {
: AnomalyDetectionResult = {

require(historicalDataPoints.nonEmpty, "historicalDataPoints must not be empty!")

Expand All @@ -57,11 +57,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
val allDataPoints = sortedDataPoints :+ newPoint

// Run anomaly
val anomalies = detectAnomaliesInHistory(allDataPoints, (newPoint.time, Long.MaxValue))
.anomalies

// Create a Detection result with all anomalies
DetectionResult(anomalies)
detectAnomaliesInHistory(allDataPoints, (newPoint.time, Long.MaxValue))
}

/**
Expand All @@ -74,7 +70,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
def detectAnomaliesInHistory(
dataSeries: Seq[DataPoint[Double]],
searchInterval: (Long, Long) = (Long.MinValue, Long.MaxValue))
: DetectionResult = {
: AnomalyDetectionResult = {

def findIndexForBound(sortedTimestamps: Seq[Long], boundValue: Long): Int = {
sortedTimestamps.search(boundValue).insertionPoint
Expand All @@ -97,6 +93,6 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
val anomalies = strategy.detect(
sortedSeries.flatMap { _.metricValue }.toVector, (lowerBoundIndex, upperBoundIndex))

DetectionResult(anomalies.map { case (index, anomaly) => (sortedTimestamps(index), anomaly) })
AnomalyDetectionResult(anomalies.map { case (index, anomaly) => (sortedTimestamps(index), anomaly) })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ trait BaseChangeStrategy
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int))
: Seq[(Int, Anomaly)] = {
: Seq[(Int, AnomalyDetectionDataPoint)] = {
val (start, end) = searchInterval

require(start <= end,
Expand All @@ -89,15 +89,24 @@ trait BaseChangeStrategy
val startPoint = Seq(start - order, 0).max
val data = diff(DenseVector(dataSeries.slice(startPoint, end): _*), order).data

data.zipWithIndex.filter { case (value, _) =>
(value < maxRateDecrease.getOrElse(Double.MinValue)
|| value > maxRateIncrease.getOrElse(Double.MaxValue))
}
.map { case (change, index) =>
(index + startPoint + order, Anomaly(Option(dataSeries(index + startPoint + order)), 1.0,
Some(s"[AbsoluteChangeStrategy]: Change of $change is not in bounds [" +
s"${maxRateDecrease.getOrElse(Double.MinValue)}, " +
s"${maxRateIncrease.getOrElse(Double.MaxValue)}]. Order=$order")))
val lowerBound = maxRateDecrease.getOrElse(Double.MinValue)
val upperBound = maxRateIncrease.getOrElse(Double.MaxValue)


data.zipWithIndex.map {
case (change, index) =>
val outputSequenceIndex = index + startPoint + order
val value = dataSeries(outputSequenceIndex)
val (detail, isAnomaly) = if (change < lowerBound || change > upperBound) {
(Some(s"[AbsoluteChangeStrategy]: Change of $change is not in bounds [" +
s"$lowerBound, " +
s"$upperBound]. Order=$order"), true)
}
else {
(None, false)
}
(outputSequenceIndex, AnomalyDetectionDataPoint(value, change,
AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ case class BatchNormalStrategy(
*/
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int)): Seq[(Int, Anomaly)] = {
searchInterval: (Int, Int)): Seq[(Int, AnomalyDetectionDataPoint)] = {

val (searchStart, searchEnd) = searchInterval

Expand Down Expand Up @@ -83,13 +83,15 @@ case class BatchNormalStrategy(

dataSeries.zipWithIndex
.slice(searchStart, searchEnd)
.filter { case (value, _) => value > upperBound || value < lowerBound }
.map { case (value, index) =>

val detail = Some(s"[BatchNormalStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound].")

(index, Anomaly(Option(value), 1.0, detail))
val (detail, isAnomaly) = if (value > upperBound || value < lowerBound) {
(Some(s"[BatchNormalStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound]."), true)
} else {
(None, false)
}
(index, AnomalyDetectionDataPoint(value, value,
AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ case class OnlineNormalStrategy(
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int))
: Seq[(Int, Anomaly)] = {
: Seq[(Int, AnomalyDetectionDataPoint)] = {

val (searchStart, searchEnd) = searchInterval

Expand All @@ -139,7 +139,6 @@ case class OnlineNormalStrategy(
computeStatsAndAnomalies(dataSeries, searchInterval)
.zipWithIndex
.slice(searchStart, searchEnd)
.filter { case (result, _) => result.isAnomaly }
.map { case (calcRes, index) =>
val lowerBound =
calcRes.mean - lowerDeviationFactor.getOrElse(Double.MaxValue) * calcRes.stdDev
Expand All @@ -149,7 +148,11 @@ case class OnlineNormalStrategy(
val detail = Some(s"[OnlineNormalStrategy]: Value ${dataSeries(index)} is not in " +
s"bounds [$lowerBound, $upperBound].")

(index, Anomaly(Option(dataSeries(index)), 1.0, detail))
val value = dataSeries(index)

(index, AnomalyDetectionDataPoint(value, value,
AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)),
calcRes.isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,25 @@ case class SimpleThresholdStrategy(
*/
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int)): Seq[(Int, Anomaly)] = {
searchInterval: (Int, Int)): Seq[(Int, AnomalyDetectionDataPoint)] = {

val (searchStart, searchEnd) = searchInterval

require (searchStart <= searchEnd, "The start of the interval can't be larger than the end.")

dataSeries.zipWithIndex
.slice(searchStart, searchEnd)
.filter { case (value, _) => value < lowerBound || value > upperBound }
.map { case (value, index) =>

val detail = Some(s"[SimpleThresholdStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound]")
val (detail, isAnomaly) = if ( value < lowerBound || value > upperBound ) {
(Some(s"[SimpleThresholdStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound]"), true)
} else {
(None, false)
}

(index, Anomaly(Option(value), 1.0, detail))
(index, AnomalyDetectionDataPoint(value, value,
AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.amazon.deequ.anomalydetection.seasonal

import breeze.linalg.DenseVector
import breeze.optimize.{ApproximateGradientFunction, DiffFunction, LBFGSB}
import com.amazon.deequ.anomalydetection.{Anomaly, AnomalyDetectionStrategy}
import com.amazon.deequ.anomalydetection.{AnomalyDetectionDataPoint, AnomalyDetectionStrategy, AnomalyThreshold, Bound}

import collection.mutable.ListBuffer

Expand Down Expand Up @@ -178,17 +178,27 @@ class HoltWinters(
forecasts: Seq[Double],
startIndex: Int,
residualSD: Double)
: Seq[(Int, Anomaly)] = {
: Seq[(Int, AnomalyDetectionDataPoint)] = {

testSeries.zip(forecasts).zipWithIndex
.collect { case ((inputValue, forecastedValue), detectionIndex)
if math.abs(inputValue - forecastedValue) > 1.96 * residualSD =>

detectionIndex + startIndex -> Anomaly(
value = Some(inputValue),
confidence = 1.0,
detail = Some(s"Forecasted $forecastedValue for observed value $inputValue")
.collect { case ((inputValue, forecastedValue), detectionIndex) =>
val anomalyMetricValue = math.abs(inputValue - forecastedValue)
val upperBound = 1.96 * residualSD

val (detail, isAnomaly) = if (anomalyMetricValue > upperBound) {
(Some(s"Forecasted $forecastedValue for observed value $inputValue"), true)
} else {
(None, false)
}
detectionIndex + startIndex -> AnomalyDetectionDataPoint(
dataMetricValue = inputValue,
anomalyMetricValue = anomalyMetricValue,
anomalyThreshold = AnomalyThreshold(upperBound = Bound(upperBound)),
isAnomaly = isAnomaly,
confidence = 1.0,
detail = detail
)

}
}

Expand All @@ -202,7 +212,7 @@ class HoltWinters(
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int) = (0, Int.MaxValue))
: Seq[(Int, Anomaly)] = {
: Seq[(Int, AnomalyDetectionDataPoint)] = {

require(dataSeries.nonEmpty, "Provided data series is empty")

Expand Down
36 changes: 30 additions & 6 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ package com.amazon.deequ.checks

import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State}
import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint}
import com.amazon.deequ.anomalydetection.{AnomalyDetectionAssertionResult, AnomalyDetectionDataPoint, AnomalyDetectionMetadata, AnomalyDetectionResult, AnomalyDetectionStrategy, AnomalyDetector, AnomalyThreshold, Bound, DataPoint, HistoryUtils}
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.constraints.Constraint._
import com.amazon.deequ.constraints._
import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric}
import com.amazon.deequ.repository.MetricsRepository
import org.apache.spark.sql.expressions.UserDefinedFunction
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull}

import scala.util.matching.Regex
Expand Down Expand Up @@ -1092,7 +1091,10 @@ case class Check(
case nc: ConstraintDecorator => nc.inner
case c: Constraint => c
}
.collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer }
.collect {
case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer
case constraint: AnomalyBasedConstraint[_, _, _] => constraint.analyzer
}
.map { _.asInstanceOf[Analyzer[_, Metric[_]]] }
.toSet
}
Expand Down Expand Up @@ -1134,7 +1136,7 @@ object Check {
afterDate: Option[Long],
beforeDate: Option[Long])(
currentMetricValue: Double)
: Boolean = {
: AnomalyDetectionAssertionResult = {

// Get history keys
var repositoryLoader = metricsRepository.load()
Expand Down Expand Up @@ -1178,10 +1180,32 @@ object Check {

// Run given anomaly detection strategy and return false if the newest value is an Anomaly
val anomalyDetector = AnomalyDetector(anomalyDetectionStrategy)
val detectedAnomalies = anomalyDetector.isNewPointAnomalous(
val anomalyDetectionResult: AnomalyDetectionResult = anomalyDetector.isNewPointAnomalous(
HistoryUtils.extractMetricValues[Double](historicalMetrics),
DataPoint(testDateTime, Some(currentMetricValue)))

detectedAnomalies.anomalies.isEmpty

// this function checks if the newest point is anomalous and returns a boolean for assertion,
// along with that newest point with anomaly check details
getNewestPointAnomalyResults(anomalyDetectionResult)
}

private[deequ] def getNewestPointAnomalyResults(anomalyDetectionResult: AnomalyDetectionResult):
AnomalyDetectionAssertionResult = {
val (hasNoAnomaly, anomalyDetectionMetaData): (Boolean, AnomalyDetectionMetadata) = {

// Based on upstream code, this anomaly detection data point sequence should never be empty
require(anomalyDetectionResult.anomalyDetectionDataPointSequence != Nil,
"AnomalyDetectionDataPointSequence cannot be empty")

// get the last anomaly detection data point of sequence (there should only be one element for now)
// and check the isAnomaly boolean, also return the last anomaly detection data point
// wrapped in the anomaly detection metadata class
anomalyDetectionResult.anomalyDetectionDataPointSequence match {
case _ :+ lastAnomalyDataPoint =>
(!lastAnomalyDataPoint._2.isAnomaly, AnomalyDetectionMetadata(lastAnomalyDataPoint._2))
}
}
AnomalyDetectionAssertionResult(hasNoAnomaly = hasNoAnomaly, anomalyDetectionMetadata = anomalyDetectionMetaData)
}
}
Loading

0 comments on commit 847d522

Please sign in to comment.