Skip to content

Commit

Permalink
Namespaced cluster events and logs integration (#690)
Browse files Browse the repository at this point in the history
* namespaced events integration

Signed-off-by: Tullio Sebastiani <[email protected]>

* namespaced logs  implementation

Signed-off-by: Tullio Sebastiani <[email protected]>

namespaced logs plugin scenario

Signed-off-by: Tullio Sebastiani <[email protected]>

namespaced logs integration

Signed-off-by: Tullio Sebastiani <[email protected]>

* logs collection fix

Signed-off-by: Tullio Sebastiani <[email protected]>

* krkn-lib 3.1.0 update

Signed-off-by: Tullio Sebastiani <[email protected]>

---------

Signed-off-by: Tullio Sebastiani <[email protected]>
  • Loading branch information
tsebastiani authored Sep 12, 2024
1 parent 5e7938b commit 736c90e
Show file tree
Hide file tree
Showing 17 changed files with 452 additions and 138 deletions.
30 changes: 23 additions & 7 deletions kraken/application_outage/actions.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
import yaml
import logging
import time

from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift

import kraken.cerberus.setup as cerberus
from jinja2 import Template
import kraken.invoke.command as runcommand
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception

from kraken import utils


# Reads the scenario config, applies and deletes a network policy to
# block the traffic for the specified duration
def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
failed_post_scenarios = ""
scenario_telemetries: list[ScenarioTelemetry] = []
failed_scenarios = []
for app_outage_config in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = app_outage_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, app_outage_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, app_outage_config)
if len(app_outage_config) > 1:
try:
with open(app_outage_config, "r") as f:
Expand Down Expand Up @@ -57,15 +63,15 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry
# Block the traffic by creating network policy
logging.info("Creating the network policy")

kubecli.create_net_policy(yaml_spec, namespace)
telemetry.kubecli.create_net_policy(yaml_spec, namespace)

# wait for the specified duration
logging.info("Waiting for the specified duration in the config: %s" % (duration))
time.sleep(duration)

# unblock the traffic by deleting the network policy
logging.info("Deleting the network policy")
kubecli.delete_net_policy("kraken-deny", namespace)
telemetry.kubecli.delete_net_policy("kraken-deny", namespace)

logging.info("End of scenario. Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
Expand All @@ -79,6 +85,16 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

36 changes: 30 additions & 6 deletions kraken/arcaflow_plugin/arcaflow_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,47 @@
import logging
from pathlib import Path
from typing import List

from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift

from .context_auth import ContextAuth
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry

from .. import utils


def run(scenarios_list: List[str], kubeconfig_path: str, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list: List[str],
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str
) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
failed_post_scenarios = []
for scenario in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry,scenario)
start_time = time.time()
scenario_telemetry.start_timestamp = start_time
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
engine_args = build_args(scenario)
status_code = run_workflow(engine_args, kubeconfig_path)
scenario_telemetry.end_timestamp = time.time()
status_code = run_workflow(engine_args, telemetry.kubecli.get_kubeconfig_path())
end_time = time.time()
scenario_telemetry.end_timestamp = end_time
scenario_telemetry.exit_status = status_code

utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(start_time),
int(end_time))

# this is the design proposal for the namespaced logs collection
# check the krkn-lib latest commit to follow also the changes made here
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(start_time),
int(end_time))

scenario_telemetries.append(scenario_telemetry)
if status_code != 0:
failed_post_scenarios.append(scenario)
Expand Down
38 changes: 28 additions & 10 deletions kraken/network_chaos/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@
import time
import os
import random

from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift

import kraken.cerberus.setup as cerberus
import kraken.node_actions.common_node_functions as common_node_functions
from jinja2 import Environment, FileSystemLoader
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception

from kraken import utils


# krkn_lib
# Reads the scenario config and introduces traffic variations in Node's host network interface.
def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
failed_post_scenarios = ""
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
logging.info("Runing the Network Chaos tests")
failed_post_scenarios = ""
scenario_telemetries: list[ScenarioTelemetry] = []
Expand All @@ -24,7 +31,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = net_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, net_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, net_config)
try:
with open(net_config, "r") as file:
param_lst = ["latency", "loss", "bandwidth"]
Expand Down Expand Up @@ -56,11 +63,11 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
node_name_list = [test_node]
nodelst = []
for single_node_name in node_name_list:
nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count, kubecli))
nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count, telemetry.kubecli))
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_template = env.get_template("pod.j2")
test_interface = verify_interface(test_interface, nodelst, pod_template, kubecli)
test_interface = verify_interface(test_interface, nodelst, pod_template, telemetry.kubecli)
joblst = []
egress_lst = [i for i in param_lst if i in test_egress]
chaos_config = {
Expand All @@ -86,13 +93,13 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
job_template.render(jobname=i + str(hash(node))[:5], nodename=node, cmd=exec_cmd)
)
joblst.append(job_body["metadata"]["name"])
api_response = kubecli.create_job(job_body)
api_response = telemetry.kubecli.create_job(job_body)
if api_response is None:
raise Exception("Error creating job")
if test_execution == "serial":
logging.info("Waiting for serial job to finish")
start_time = int(time.time())
wait_for_job(joblst[:], kubecli, test_duration + 300)
wait_for_job(joblst[:], telemetry.kubecli, test_duration + 300)
logging.info("Waiting for wait_duration %s" % wait_duration)
time.sleep(wait_duration)
end_time = int(time.time())
Expand All @@ -102,7 +109,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
if test_execution == "parallel":
logging.info("Waiting for parallel job to finish")
start_time = int(time.time())
wait_for_job(joblst[:], kubecli, test_duration + 300)
wait_for_job(joblst[:], telemetry.kubecli, test_duration + 300)
logging.info("Waiting for wait_duration %s" % wait_duration)
time.sleep(wait_duration)
end_time = int(time.time())
Expand All @@ -112,13 +119,24 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
raise RuntimeError()
finally:
logging.info("Deleting jobs")
delete_job(joblst[:], kubecli)
delete_job(joblst[:], telemetry.kubecli)
except (RuntimeError, Exception):
scenario_telemetry.exit_status = 1
failed_scenarios.append(net_config)
log_exception(net_config)
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

Expand Down
26 changes: 22 additions & 4 deletions kraken/node_actions/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import logging
import sys
import time

from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift

from kraken import utils
from kraken.node_actions.aws_node_scenarios import aws_node_scenarios
from kraken.node_actions.general_cloud_node_scenarios import general_node_scenarios
from kraken.node_actions.az_node_scenarios import azure_node_scenarios
Expand Down Expand Up @@ -55,23 +59,27 @@ def get_node_scenario_object(node_scenario, kubecli: KrknKubernetes):

# Run defined scenarios
# krkn_lib
def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
failed_scenarios = []
for node_scenario_config in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = node_scenario_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, node_scenario_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, node_scenario_config)
with open(node_scenario_config, "r") as f:
node_scenario_config = yaml.full_load(f)
for node_scenario in node_scenario_config["node_scenarios"]:
node_scenario_object = get_node_scenario_object(node_scenario, kubecli)
node_scenario_object = get_node_scenario_object(node_scenario, telemetry.kubecli)
if node_scenario["actions"]:
for action in node_scenario["actions"]:
start_time = int(time.time())
try:
inject_node_scenario(action, node_scenario, node_scenario_object, kubecli)
inject_node_scenario(action, node_scenario, node_scenario_object, telemetry.kubecli)
logging.info("Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
end_time = int(time.time())
Expand All @@ -85,6 +93,16 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry.exit_status = 0

scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)

return failed_scenarios, scenario_telemetries
Expand Down
28 changes: 20 additions & 8 deletions kraken/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
from arcaflow_plugin_kill_pod import kill_pods, wait_for_pods
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift

import kraken.plugins.node_scenarios.vmware_plugin as vmware_plugin
import kraken.plugins.node_scenarios.ibmcloud_plugin as ibmcloud_plugin
from kraken import utils
from kraken.plugins.run_python_plugin import run_python_file
from kraken.plugins.network.ingress_shaping import network_chaos
from kraken.plugins.pod_network_outage.pod_network_outage_plugin import pod_outage
Expand Down Expand Up @@ -249,28 +251,27 @@ def json_schema(self):


def run(scenarios: List[str],
kubeconfig_path: str,
kraken_config: str,
failed_post_scenarios: List[str],
wait_duration: int,
telemetry: KrknTelemetryKubernetes,
kubecli: KrknKubernetes,
run_uuid: str
telemetry: KrknTelemetryOpenshift,
run_uuid: str,
telemetry_request_id: str,
) -> (List[str], list[ScenarioTelemetry]):

scenario_telemetries: list[ScenarioTelemetry] = []
for scenario in scenarios:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
logging.info('scenario ' + str(scenario))
pool = PodsMonitorPool(kubecli)
pool = PodsMonitorPool(telemetry.kubecli)
kill_scenarios = [kill_scenario for kill_scenario in PLUGINS.unserialize_scenario(scenario) if kill_scenario["id"] == "kill-pods"]

try:
start_monitoring(pool, kill_scenarios)
PLUGINS.run(scenario, kubeconfig_path, kraken_config, run_uuid)
PLUGINS.run(scenario, telemetry.kubecli.get_kubeconfig_path(), kraken_config, run_uuid)
result = pool.join()
scenario_telemetry.affected_pods = result
if result.error:
Expand All @@ -286,8 +287,19 @@ def run(scenarios: List[str],
scenario_telemetry.exit_status = 0
logging.info("Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
scenario_telemetries.append(scenario_telemetry)
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))

scenario_telemetries.append(scenario_telemetry)

return failed_post_scenarios, scenario_telemetries

Expand Down
Loading

0 comments on commit 736c90e

Please sign in to comment.