Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test scenario for DJM with java lib injection #3500

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
16 changes: 15 additions & 1 deletion .github/workflows/run-lib-injection.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
matrix: ${{ steps.compute-matrix.outputs.matrix }}
matrix_supported_langs: ${{ steps.compute-matrix.outputs.matrix_supported_langs }}
matrix_profiling_supported: ${{ steps.compute-matrix.outputs.matrix_profiling_supported }}
matrix_skip_basic: ${{ steps.compute-matrix.outputs.matrix_skip_basic }}
init_image: ${{ steps.compute-matrix.outputs.init_image }}
steps:
- name: Compute matrix
Expand All @@ -41,7 +42,9 @@ jobs:
"cpp": [],
"dotnet": [{"name":"dd-lib-dotnet-init-test-app","supported":"true"}],
"golang": [],
"java": [{"name":"dd-lib-java-init-test-app","supported":"true"},{"name":"jdk7-app","supported":"false"}],
"java": [{"name":"dd-lib-java-init-test-app","supported":"true"},
{"name":"jdk7-app","supported":"false"},
{"name":"dd-djm-spark-test-app", "supported":"true", "skip-profiling":"true", "skip-basic":"true"}],],
"nodejs": [{"name":"sample-app","supported":"true"},{"name":"sample-app-node13","supported":"false"}],
"php": [],
"python": [{"name":"dd-lib-python-init-test-django","supported":"true"},
Expand Down Expand Up @@ -80,11 +83,14 @@ jobs:
#Only supported weblog variants
results_supported_langs = []
results_profiling_supported = []
results_skip_basic = []
for weblog in weblogs["${{ inputs.library }}"]:
if weblog["supported"] == "true":
results_supported_langs.append(weblog["name"])
if "skip-profiling" not in weblog or weblog["skip-profiling"] != "true":
results_profiling_supported.append(weblog["name"])
if "skip-basic" in weblog and weblog["skip-basic"] == "true":
results_skip_basic.append(weblog["name"])

#Use the latest init image for prod version, latest_snapshot init image for dev version
if "${{ inputs.version }}" == 'prod':
Expand All @@ -97,11 +103,13 @@ jobs:
print(f'init_image={json.dumps(result_init_image)}', file=fh)
print(f'matrix_supported_langs={json.dumps(results_supported_langs)}', file=fh)
print(f'matrix_profiling_supported={json.dumps(results_profiling_supported)}', file=fh)
print(f'matrix_skip_basic={json.dumps(results_skip_basic)}', file=fh)

print(json.dumps(result, indent=2))
print(json.dumps(result_init_image, indent=2))
print(json.dumps(results_supported_langs, indent=2))
print(json.dumps(results_profiling_supported, indent=2))
print(json.dumps(results_skip_basic, indent=2))

lib-injection-init-image-validator:
if: inputs.library == 'dotnet' || inputs.library == 'java' || inputs.library == 'python' || inputs.library == 'ruby' || inputs.library == 'nodejs'
Expand Down Expand Up @@ -231,13 +239,19 @@ jobs:

- name: Kubernetes lib-injection tests
id: k8s-lib-injection-tests
if: ${{ !contains(fromJson(needs.compute-matrix.outputs.matrix_skip_basic), matrix.weblog) }}
run: ./run.sh K8S_LIBRARY_INJECTION_BASIC

- name: Kubernetes lib-injection profiling tests
id: k8s-lib-injection-tests-profiling
if: ${{ contains(fromJson(needs.compute-matrix.outputs.matrix_profiling_supported), matrix.weblog) }}
run: ./run.sh K8S_LIBRARY_INJECTION_PROFILING

- name: Kubernetes lib-injection DJM tests
id: k8s-lib-injection-tests-djm
if: ${{ matrix.weblog == 'dd-djm-spark-test-app' }}
run: ./run.sh K8S_LIBRARY_INJECTION_DJM

- name: Compress logs
id: compress_logs
if: always() && steps.build.outcome == 'success'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM apache/spark:3.4.4

WORKDIR /opt/spark/work-dir

USER root
COPY launch.sh /opt/spark/work-dir/launch.sh
RUN chown spark:spark /opt/spark/work-dir/launch.sh
RUN chmod +x /opt/spark/work-dir/launch.sh
USER spark

CMD ["/opt/spark/work-dir/launch.sh"]
22 changes: 22 additions & 0 deletions lib-injection/build/docker/java/dd-djm-spark-test-app/launch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

# Submit a example spark job with DJM enabled
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT \
--conf spark.kubernetes.container.image=apache/spark:3.4.4 \
--deploy-mode cluster \
--conf spark.kubernetes.namespace=default \
--conf spark.kubernetes.executor.deleteOnTermination=false \
--conf spark.kubernetes.driver.label.admission.datadoghq.com/enabled=true \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
--conf spark.kubernetes.driver.annotation.admission.datadoghq.com/java-lib.version=latest \
--conf spark.kubernetes.driverEnv.DD_APM_INSTRUMENTATION_DEBUG=true \
--conf spark.kubernetes.driver.annotation.admission.datadoghq.com/apm-inject.version=latest \
--conf spark.driver.extraJavaOptions="-Ddd.integrations.enabled=false -Ddd.data.jobs.enabled=true -Ddd.service=spark-pi-example -Ddd.env=test -Ddd.version=0.1.0 -Ddd.tags=team:djm" \
--conf spark.kubernetes.driverEnv.HADOOP_HOME=/opt/hadoop/ \
local:///opt/spark/examples/jars/spark-examples.jar 20

# start a long running server to keep the web-log up.
python3 -m http.server ${SERVER_PORT:-18080}
5 changes: 3 additions & 2 deletions tests/k8s_lib_injection/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ def deploy_test_agent(self):
def deploy_agent(self):
self.test_agent.deploy_agent()

def deploy_weblog_as_pod(self, with_admission_controller=True, use_uds=False, env=None):
def deploy_weblog_as_pod(self, with_admission_controller=True, use_uds=False, env=None, service_account=None):
if with_admission_controller:
self.test_weblog.install_weblog_pod_with_admission_controller(env=env)
self.test_weblog.install_weblog_pod_with_admission_controller(env=env, service_account=service_account)
else:
self.test_weblog.install_weblog_pod_without_admission_controller(use_uds, env=env)

def export_debug_info(self):
self.test_agent.export_debug_info()
self.test_weblog.export_debug_info()

62 changes: 62 additions & 0 deletions tests/k8s_lib_injection/test_k8s_djm_with_ssi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import time

import requests
import json

from utils import scenarios, features, context, irrelevant
from utils.tools import logger
from utils import scenarios, features
from utils.k8s_lib_injection.k8s_command_utils import execute_command_sync

@features.k8s_admission_controller
@scenarios.k8s_library_injection_djm
@irrelevant(condition=(context.library!="java"), reason="Data Jobs Monitoring requires Java library only.")
@irrelevant(condition=(context.weblog_variant!="dd-djm-spark-test-app"), reason="Data Jobs Monitoring tests are only applicable when using dd-djm-spark-test-app web-log variant.")
class TestK8sDJMWithSSI:
""" This test case validates java lib injection for Data Jobs Monitoring on k8s.
The tracer is injected using admission controller via annotations on submitted Spark application.
We then use the dev test agent to check if the Spark application is instrumented.
"""

def _get_dev_agent_traces(self, k8s_kind_cluster, retry=10):
for _ in range(retry):
logger.info(f"[Check traces] Checking traces:")
response = requests.get(
f"http://{k8s_kind_cluster.cluster_host_name}:{k8s_kind_cluster.get_agent_port()}/test/traces"
)
traces_json = response.json()
if len(traces_json) > 0:
logger.debug(f"Test traces response: {traces_json}")
return traces_json
time.sleep(2)
return []

def _get_spark_application_traces(self, test_k8s_instance):
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster)
logger.debug(f"Traces received: {traces_json}")
return [
trace for trace in traces_json
if any(span.get("name") == "spark.application" and span.get("type") == "spark" for span in trace)
]

def test_spark_instrumented_with_ssi(self, test_k8s_instance):
logger.info(
f"Launching test test_spark_instrumented_with_ssi: Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)

# create service account for launching spark application in k8s
execute_command_sync(f"kubectl create serviceaccount spark", test_k8s_instance.k8s_kind_cluster)
execute_command_sync(f"kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default", test_k8s_instance.k8s_kind_cluster)

test_k8s_instance.deploy_test_agent()
test_k8s_instance.deploy_datadog_cluster_agent()
test_k8s_instance.deploy_weblog_as_pod(service_account="spark")

spark_traces = self._get_spark_application_traces(test_k8s_instance)

logger.info(f"Spark application traces received: {spark_traces}")
with open(f"{test_k8s_instance.output_folder}/spark_traces.json", "w") as f:
f.write(json.dumps(spark_traces, indent=4))
assert len(spark_traces) > 0, "No Data Jobs Monitoring Spark application traces found"

logger.info(f"Test test_spark_instrumented_with_ssi finished")
7 changes: 7 additions & 0 deletions utils/_context/_scenarios/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,13 @@ def all_endtoend_scenarios(test_object):
github_workflow="libinjection",
scenario_groups=[ScenarioGroup.ALL, ScenarioGroup.LIB_INJECTION],
)

k8s_library_injection_djm = KubernetesScenario(
"K8S_LIBRARY_INJECTION_DJM",
doc="Kubernetes Instrumentation with Data Jobs Monitoring",
github_workflow="libinjection",
scenario_groups=[ScenarioGroup.ALL, ScenarioGroup.LIB_INJECTION],
)

k8s_library_injection_profiling = KubernetesScenario(
"K8S_LIBRARY_INJECTION_PROFILING",
Expand Down
10 changes: 6 additions & 4 deletions utils/k8s_lib_injection/k8s_weblog.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def configure(self, k8s_kind_cluster, k8s_wrapper):
self.k8s_wrapper = k8s_wrapper
self.logger = k8s_logger(self.output_folder, self.test_name, "k8s_logger")

def _get_base_weblog_pod(self, env=None):
def _get_base_weblog_pod(self, env=None, service_account=None):
""" Installs a target app for manual library injection testing.
It returns when the app pod is ready."""

Expand Down Expand Up @@ -106,15 +106,17 @@ def _get_base_weblog_pod(self, env=None):

containers.append(container1)

pod_spec = client.V1PodSpec(containers=containers)
pod_spec = client.V1PodSpec(
containers=containers,
service_account=service_account)

pod_body = client.V1Pod(api_version="v1", kind="Pod", metadata=pod_metadata, spec=pod_spec)
self.logger.info("[Deploy weblog] Weblog pod configuration done.")
return pod_body

def install_weblog_pod_with_admission_controller(self, env=None):
def install_weblog_pod_with_admission_controller(self, env=None, service_account=None):
self.logger.info("[Deploy weblog] Installing weblog pod using admission controller")
pod_body = self._get_base_weblog_pod(env=env)
pod_body = self._get_base_weblog_pod(env=env, service_account=service_account)
self.k8s_wrapper.create_namespaced_pod(body=pod_body)
self.logger.info("[Deploy weblog] Weblog pod using admission controller created. Waiting for it to be ready!")
self.wait_for_weblog_ready_by_label_app("my-app", timeout=200)
Expand Down
Loading