Skip to content

Commit

Permalink
namespaced events integration
Browse files Browse the repository at this point in the history
Signed-off-by: Tullio Sebastiani <[email protected]>
  • Loading branch information
tsebastiani committed Sep 6, 2024
1 parent 6186555 commit 1fb22a9
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 33 deletions.
9 changes: 8 additions & 1 deletion kraken/application_outage/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception

from kraken import utils


# Reads the scenario config, applies and deletes a network policy to
# block the traffic for the specified duration
Expand All @@ -20,7 +22,7 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = app_outage_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, app_outage_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, app_outage_config)
if len(app_outage_config) > 1:
try:
with open(app_outage_config, "r") as f:
Expand Down Expand Up @@ -79,6 +81,11 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

12 changes: 9 additions & 3 deletions kraken/arcaflow_plugin/arcaflow_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@
import logging
from pathlib import Path
from typing import List

from .context_auth import ContextAuth
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry

from .. import utils


def run(scenarios_list: List[str], kubeconfig_path: str, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
failed_post_scenarios = []
for scenario in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry,scenario)
start_time = time.time()
scenario_telemetry.start_timestamp = start_time
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
engine_args = build_args(scenario)
status_code = run_workflow(engine_args, kubeconfig_path)
scenario_telemetry.end_timestamp = time.time()
end_time = time.time()
scenario_telemetry.end_timestamp = end_time
scenario_telemetry.exit_status = status_code
utils.populate_cluster_events(scenario_telemetry, parsed_scenario_config, telemetry.kubecli, int(start_time), int(end_time))
scenario_telemetries.append(scenario_telemetry)
if status_code != 0:
failed_post_scenarios.append(scenario)
Expand Down
10 changes: 9 additions & 1 deletion kraken/network_chaos/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception

from kraken import utils


# krkn_lib
# Reads the scenario config and introduces traffic variations in Node's host network interface.
Expand All @@ -24,7 +26,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = net_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, net_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, net_config)
try:
with open(net_config, "r") as file:
param_lst = ["latency", "loss", "bandwidth"]
Expand Down Expand Up @@ -119,6 +121,12 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
log_exception(net_config)
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

Expand Down
9 changes: 8 additions & 1 deletion kraken/node_actions/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import sys
import time

from kraken import utils
from kraken.node_actions.aws_node_scenarios import aws_node_scenarios
from kraken.node_actions.general_cloud_node_scenarios import general_node_scenarios
from kraken.node_actions.az_node_scenarios import azure_node_scenarios
Expand Down Expand Up @@ -62,7 +64,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = node_scenario_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, node_scenario_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, node_scenario_config)
with open(node_scenario_config, "r") as f:
node_scenario_config = yaml.full_load(f)
for node_scenario in node_scenario_config["node_scenarios"]:
Expand All @@ -85,6 +87,11 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry.exit_status = 0

scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)

return failed_scenarios, scenario_telemetries
Expand Down
10 changes: 8 additions & 2 deletions kraken/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import kraken.plugins.node_scenarios.vmware_plugin as vmware_plugin
import kraken.plugins.node_scenarios.ibmcloud_plugin as ibmcloud_plugin
from kraken import utils
from kraken.plugins.run_python_plugin import run_python_file
from kraken.plugins.network.ingress_shaping import network_chaos
from kraken.plugins.pod_network_outage.pod_network_outage_plugin import pod_outage
Expand Down Expand Up @@ -263,7 +264,7 @@ def run(scenarios: List[str],
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
logging.info('scenario ' + str(scenario))
pool = PodsMonitorPool(kubecli)
kill_scenarios = [kill_scenario for kill_scenario in PLUGINS.unserialize_scenario(scenario) if kill_scenario["id"] == "kill-pods"]
Expand All @@ -286,8 +287,13 @@ def run(scenarios: List[str],
scenario_telemetry.exit_status = 0
logging.info("Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
scenario_telemetries.append(scenario_telemetry)
scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)

return failed_post_scenarios, scenario_telemetries

Expand Down
9 changes: 8 additions & 1 deletion kraken/pod_scenarios/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from arcaflow_plugin_sdk import serialization
from krkn_lib.utils.functions import get_yaml_item_value, log_exception

from kraken import utils


# Run pod based scenarios
def run(kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_duration):
Expand Down Expand Up @@ -89,7 +91,7 @@ def container_run(kubeconfig_path,
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = container_scenario_config[0]
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, container_scenario_config[0])
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, container_scenario_config[0])
if len(container_scenario_config) > 1:
pre_action_output = post_actions.run(kubeconfig_path, container_scenario_config[1])
else:
Expand Down Expand Up @@ -125,6 +127,11 @@ def container_run(kubeconfig_path,
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)

return failed_scenarios, scenario_telemetries
Expand Down
11 changes: 10 additions & 1 deletion kraken/pvc/pvc_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import re
import time
import yaml

from .. import utils
from ..cerberus import setup as cerberus
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
Expand All @@ -22,7 +24,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = app_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, app_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, app_config)
try:
if len(app_config) > 1:
with open(app_config, "r") as f:
Expand Down Expand Up @@ -321,6 +323,13 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
log_exception(app_config)
else:
scenario_telemetry.exit_status = 0

scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)

return failed_scenarios, scenario_telemetries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception

from kraken import utils


def delete_objects(kubecli, namespace):

Expand Down Expand Up @@ -166,7 +168,7 @@ def run(
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario_config[0]
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario_config[0])
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario_config[0])
try:
if len(scenario_config) > 1:
pre_action_output = post_actions.run(kubeconfig_path, scenario_config[1])
Expand Down Expand Up @@ -255,6 +257,11 @@ def run(
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

Expand Down
33 changes: 19 additions & 14 deletions kraken/service_hijacking/service_hijacking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.utils import log_exception

from kraken import utils


def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
Expand All @@ -14,7 +17,7 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes,
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
with open(scenario) as stream:
scenario_config = yaml.safe_load(stream)

Expand All @@ -28,7 +31,7 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes,
logging.info(f"checking service {service_name} in namespace: {service_namespace}")
if not krkn_lib.service_exists(service_name, service_namespace):
logging.error(f"service: {service_name} not found in namespace: {service_namespace}, failed to run scenario.")
fail(scenario_telemetry, scenario_telemetries)
fail_scenario_telemetry(scenario_telemetry)
failed_post_scenarios.append(scenario)
break
try:
Expand All @@ -48,7 +51,7 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes,
original_service = krkn_lib.replace_service_selector([webservice.selector], service_name, service_namespace)
if original_service is None:
logging.error(f"failed to patch service: {service_name}, namespace: {service_namespace} with selector {webservice.selector}")
fail(scenario_telemetry, scenario_telemetries)
fail_scenario_telemetry(scenario_telemetry)
failed_post_scenarios.append(scenario)
break

Expand All @@ -61,7 +64,7 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes,
original_service = krkn_lib.replace_service_selector(selectors, service_name, service_namespace)
if original_service is None:
logging.error(f"failed to restore original service: {service_name}, namespace: {service_namespace} with selectors: {selectors}")
fail(scenario_telemetry, scenario_telemetries)
fail_scenario_telemetry(scenario_telemetry)
failed_post_scenarios.append(scenario)
break
logging.info("selectors successfully restored")
Expand All @@ -70,21 +73,23 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes,

logging.info("End of scenario. Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)

scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
scenario_telemetries.append(scenario_telemetry)
logging.info("success")
except Exception as e:
logging.error(f"scenario {scenario} failed with exception: {e}")
fail(scenario_telemetry, scenario_telemetries)
failed_post_scenarios.append(scenario)
fail_scenario_telemetry(scenario_telemetry)
log_exception(scenario)

return failed_post_scenarios, scenario_telemetries
scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)

return failed_post_scenarios, scenario_telemetries

def fail(scenario_telemetry: ScenarioTelemetry, scenario_telemetries: list[ScenarioTelemetry]):
def fail_scenario_telemetry(scenario_telemetry: ScenarioTelemetry):
scenario_telemetry.exit_status = 1
scenario_telemetry.end_timestamp = time.time()
scenario_telemetries.append(scenario_telemetry)

scenario_telemetry.end_timestamp = time.time()
9 changes: 8 additions & 1 deletion kraken/shut_down/common_shut_down_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import time
from multiprocessing.pool import ThreadPool

from .. import utils
from ..cerberus import setup as cerberus
from ..post_actions import actions as post_actions
from ..node_actions.aws_node_scenarios import AWS
Expand Down Expand Up @@ -153,7 +155,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = config_path
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, config_path)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, config_path)

with open(config_path, "r") as f:
shut_down_config_yaml = yaml.full_load(f)
Expand Down Expand Up @@ -185,6 +187,11 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry.exit_status = 0

scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)

return failed_scenarios, scenario_telemetries
Expand Down
9 changes: 8 additions & 1 deletion kraken/syn_flood/syn_flood.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes

from kraken import utils


def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
Expand All @@ -17,7 +19,7 @@ def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: K
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)

try:
pod_names = []
Expand Down Expand Up @@ -62,6 +64,11 @@ def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: K
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_post_scenarios, scenario_telemetries

Expand Down
Loading

0 comments on commit 1fb22a9

Please sign in to comment.