Skip to content

Commit

Permalink
Merge pull request #151 from amosproj/51-analysis-module-first-time-s…
Browse files Browse the repository at this point in the history
…eries-analysis-implemented

51 analysis module first time series analysis implemented
  • Loading branch information
heskil authored Dec 10, 2024
2 parents 083159f + 7a98621 commit 0e24a58
Show file tree
Hide file tree
Showing 9 changed files with 3,226 additions and 130 deletions.
21 changes: 16 additions & 5 deletions apps/analyzer/metadata_analyzer/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import datetime
class Analyzer:

def init(database, backend, simple_analyzer, simple_rule_based_analyzer):
def init(
database,
backend,
simple_analyzer,
simple_rule_based_analyzer,
time_series_analyzer,
):
Analyzer.database = database
Analyzer.backend = backend
Analyzer.simple_analyzer = simple_analyzer
Analyzer.simple_rule_based_analyzer = simple_rule_based_analyzer
Analyzer.time_series_analyzer = time_series_analyzer

def analyze():
data = list(Analyzer.database.get_results())
Expand Down Expand Up @@ -61,7 +67,7 @@ def _send_Backups():

for result in results:
# Only send real backups
if result.is_backup <= 0:
if (result.is_backup is not None) and (result.is_backup <= 0):
continue

# Only send backups where the relevant data is not null
Expand Down Expand Up @@ -130,6 +136,12 @@ def simple_rule_based_analysis_inc(alert_limit):
result = Analyzer.simple_rule_based_analyzer.analyze_inc(data, alert_limit, start_date)
return result

def simple_time_series_analysis(
variable, task_id, frequency, backup_type, window_size
):
result = Analyzer.time_series_analyzer.k_means_analyze(
variable, task_id, frequency, backup_type, window_size)

def simple_rule_based_analysis_creation_dates(alert_limit):
data = list(Analyzer.database.get_results())
start_date = Analyzer._get_start_date(data, "CREATION_DATE_ALERT", None)
Expand All @@ -139,6 +151,5 @@ def simple_rule_based_analysis_creation_dates(alert_limit):
def simple_rule_based_analysis_storage_capacity(alert_limit):
data = list(Analyzer.database.get_data_stores())
result = Analyzer.simple_rule_based_analyzer.analyze_storage_capacity(
data, alert_limit
)
data, alert_limit)
return result
193 changes: 192 additions & 1 deletion apps/analyzer/metadata_analyzer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from metadata_analyzer.simple_analyzer import SimpleAnalyzer
from metadata_analyzer.simple_rule_based_analyzer import SimpleRuleBasedAnalyzer
from metadata_analyzer.analyzer import Analyzer
from metadata_analyzer.time_series_analyzer import Time_series_analyzer
from metadata_analyzer.backend import Backend
from flasgger import Swagger
import requests
Expand Down Expand Up @@ -195,6 +196,7 @@ def simple_rule_based_analysis():
return "Invalid value for alert limit", 400



@app.route("/simpleRuleBasedAnalysisDiff", methods=["POST"])
def simple_rule_based_analysis_diff():
"""Fulfills a simple rule based analysis on diff backups.
Expand Down Expand Up @@ -244,6 +246,7 @@ def simple_rule_based_analysis_diff():
return "Invalid value for alert limit", 400



@app.route("/simpleRuleBasedAnalysisInc", methods=["POST"])
def simple_rule_based_analysis_inc():
"""Fulfills a simple rule based analysis on inc backups.
Expand Down Expand Up @@ -346,12 +349,200 @@ def simple_rule_based_analysis_storage_capacity():
return "Invalid value for alert limit", 400



# TODO yaml for swagger
@app.route("/kMeansAnomalies", methods=["POST"])
def runTimeSeriesTests():
"""Runs k-means anomaly detection on the specified dataset.
---
parameters:
- name: input
in: body
type: object
required: true
properties:
variable:
type: string
example: 'data_size'
task_id:
type: string
example: '67de754c-b953-4098-83cd-6d34ca2960c3'
backup_type:
type: string
example: 'F'
frequency:
type: int
example: 86401
window_size:
type: int
example: 2
definitions:
MeansBody:
type: object
properties:
variable:
type: string
example: 'data_size'
task_id:
type: string
example: '67de754c-b953-4098-83cd-6d34ca2960c3'
backup_type:
type: string
example: 'F'
frequency:
type: int
example: 86401
windows_size:
type: int
example: 2
Timestamps:
type: array
items:
type: string
example:
- 'Tue, 10 Sep 2024 21:01:22 GMT'
- 'Sat, 21 Sep 2024 21:01:33 GMT'
- 'Sun, 22 Sep 2024 21:01:34 GMT'
- 'Tue, 08 Oct 2024 21:01:50 GMT'
- 'Wed, 09 Oct 2024 21:01:51 GMT'
responses:
200:
description: The timestamps of the anomalies
schema:
$ref: '#/definitions/Timestamps'
"""
json = request.get_json()
field = "None"
try:
field = "variable"
variable = json["variable"]
field = "task_id"
task_id = json["task_id"]
field = "frequency"
frequency = json["frequency"]
field = "backup_type"
backup_type = json["backup_type"]
field = "window_size"
window_size = json["window_size"]
except KeyError:
return "Missing field of type " + field, 400

try:
result = Analyzer.simple_time_series_analysis(
variable, task_id, frequency, backup_type, window_size
)
return jsonify(result)
except ValueError as val:
return "Value error occured: " + str(val), 400


@app.route("/getTaskIds", methods=["GET"])
def return_task_ids():
"""Gets task ids of current dataset, necessary for time series analysis.
---
definitions:
task_ids:
type: object
properties:
1:
type: string
example: 'd6f0d862-ef51-4f01-8d34-5503a58c6421'
2:
type: string
example: '67de754c-b953-4098-83cd-6d34ca2960c3'
3:
type: string
example: '8cc9efbc-d392-430d-8844-af04da35e7d6'
responses:
200:
description: All possible task ids
schema:
$ref: '#/definitions/task_ids'
"""
return jsonify(Time_series_analyzer.get_task_ids())


@app.route("/getFrequenciesForTask", methods=["POST"])
def return_frequencies():
"""Gets frequencies for a specific task, variable and backup type.
---
parameters:
- name: input
in: body
type: object
required: true
properties:
variable:
type: string
example: 'data_size'
task_id:
type: string
example: '67de754c-b953-4098-83cd-6d34ca2960c3'
backup_type:
type: string
example: 'F'
definitions:
Frequencies:
type: object
properties:
count:
type: object
properties:
0:
type: int
example: 20
1:
type: int
example: 17
2:
type: int
example: 5
sbc_start:
type: object
properties:
0:
type: int
example: 86400
1:
type: int
example: 86401
2:
type: int
example: 86399
responses:
200:
description: All backup frequencies found that meet the conditions, ranked by times appeared
schema:
$ref: '#/definitions/Frequencies'
"""
json = request.get_json()
field = "None"
try:
field = "task_id"
task_id = json["task_id"]
field = "backup_type"
backup_type = json["backup_type"]
field = "variable"
variable = json["variable"]
except KeyError:
return "Missing field of type " + field, 400

return jsonify(Time_series_analyzer.get_frequencies(task_id, backup_type, variable))


def main():
database = Database()
backend = Backend(os.getenv("BACKEND_URL"))
simple_analyzer = SimpleAnalyzer()
time_series_analyzer = Time_series_analyzer(database)
simple_rule_based_analyzer = SimpleRuleBasedAnalyzer(backend, 0.2, 0.2, 0.2, 0.2)
Analyzer.init(database, backend, simple_analyzer, simple_rule_based_analyzer)
Analyzer.init(
database,
backend,
simple_analyzer,
simple_rule_based_analyzer,
time_series_analyzer,
)

print(f"FLASK_RUN_HOST: {os.getenv('FLASK_RUN_HOST')}")
print(f"FLASK_RUN_PORT: {os.getenv('FLASK_RUN_PORT')}")
Expand Down
93 changes: 93 additions & 0 deletions apps/analyzer/metadata_analyzer/timeSeriesTests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# %%
import pandas as pd
from darts import TimeSeries
from darts.ad import (
KMeansScorer,
)
from darts.ad.detectors import QuantileDetector
from sqlalchemy import create_engine
from metadata_analyzer.database import Database
from flask import jsonify


# Create an engine using shared init
database = Database()

# read table into a dataframe
with database.engine.connect() as conn, conn.begin():
df = pd.read_sql_table("results", conn)

# --------------- General Preprocessing ---------------
# removes null values in sbc_start, task_uuid and is_backup
df = df[df.sbc_start.notnull()]
df = df[df.is_backup.notnull()]
df = df[df.task_uuid.notnull()]
df = df[df.data_size.notnull()]
# removes non-backups
df = df[df.is_backup != 0]
# removes backups with size of 0.0
df = df[df.data_size != 0.0]
# removes non-full backups (also removes copy backups in dataset for now)
df = df[df.fdi_type == "F"]
# remove entries that have the same sbc_start, necessary for indexing the time axis (could cause problems later)
df = df.drop_duplicates(subset=["sbc_start"])
# sorts dataframe by sbc_start
df = df.sort_values("sbc_start")
# gets all possible values for task_uuid and data_size (debugging utility)
task_ids = df["task_uuid"].unique()
sizes = df["data_size"].unique()

# --------------- Task Specific Preprocessing ---------------#
# removes backups that do not have chosen task id; current task has frequency of 86401s
df = df[df.task_uuid == task_ids[6]]

# removes columns that are not relevant (change for other analyses)
df = df[["sbc_start", "data_size"]]

# init if utility variable for looking at frequencies
freqs = df.sort_values("sbc_start")
# frequencies present in dataset
freqs = df.diff()["sbc_start"]
freqs = freqs.value_counts()
freqs = freqs[freqs.notnull()]

df.index = df["sbc_start"] # sets index to datetime in sbc_start column
df = df.drop("sbc_start", axis=1) # removes column because values are now in index

# interpolates series to emulate backups at regular intervals, different methods possible
df = df.asfreq("86401s", method="ffill")
# df.plot()

# utility variable for sizes after reduced to one task
sizes = df["data_size"].unique()
clusters = len(sizes)

# init actual series
series = TimeSeries.from_series(df, fill_missing_dates=False, freq="86401s")

# interim definition of training data, change to something more useful
series_train = series[:100]

# using basic k-means scorer (moving window comparison)
Kmeans_scorer = KMeansScorer(k=2, window=1, component_wise=False)
Kmeans_scorer.fit(series_train)
# computed anomaly scores
anomaly_score = Kmeans_scorer.score(series)

# detects where anomalies lie, then return binary prediction
threshold = 0.95
detector = QuantileDetector(high_quantile=threshold)
anomaly_pred = detector.fit_detect(series=anomaly_score)

# TODO decide on interface to backend and return useful values in useful format
anomaly_timestamps = []
vals = anomaly_pred.all_values()
# gets timestamps of anomalies
for i in range(len(vals)):
if vals[i] == 1.0:
anomaly_timestamps.append(series.get_timestamp_at_point(i))
print("timestamps of detected anomalies:")
print(anomaly_timestamps)


# %%
Loading

0 comments on commit 0e24a58

Please sign in to comment.