Skip to content

Commit

Permalink
Add opentelemetry integration (datacontract#93)
Browse files Browse the repository at this point in the history
* Closes datacontract#77

Co-authored-by: Dominik Guhr <[email protected]>
  • Loading branch information
simonharrer and DGuhr authored Mar 15, 2024
1 parent 3b8c1e5 commit db3b28c
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 4 deletions.
11 changes: 9 additions & 2 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def test(
help="Run the schema and quality tests on the example data within the data contract.")] = None,
publish: Annotated[str, typer.Option(
help="The url to publish the results after the test")] = None,
publish_to_opentelemetry: Annotated[bool, typer.Option(
help="Publish the results to opentelemetry. Use environment variables to configure the OTLP endpoint, headers, etc.")] = False,
logs: Annotated[bool, typer.Option(
help="Print logs")] = False,
):
Expand All @@ -109,8 +111,13 @@ def test(
print(f"Testing {location}")
if server == "all":
server = None
run = DataContract(data_contract_file=location, schema_location=schema, publish_url=publish, server=server,
examples=examples).test()
run = DataContract(data_contract_file=location,
schema_location=schema,
publish_url=publish,
publish_to_opentelemetry=publish_to_opentelemetry,
server=server,
examples=examples,
).test()
if logs:
_print_logs(run)
_handle_result(run)
Expand Down
13 changes: 12 additions & 1 deletion datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from datacontract.imports.sql_importer import import_sql
from datacontract.integration.publish_datamesh_manager import \
publish_datamesh_manager
from datacontract.integration.publish_opentelemetry import publish_opentelemetry
from datacontract.lint import resolve

from datacontract.model.breaking_change import BreakingChanges, BreakingChange, Severity
Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(
server: str = None,
examples: bool = False,
publish_url: str = None,
publish_to_opentelemetry: bool = False,
spark: str = None,
inline_definitions: bool = False,
):
Expand All @@ -81,6 +83,7 @@ def __init__(
self._server = server
self._examples = examples
self._publish_url = publish_url
self._publish_to_opentelemetry = publish_to_opentelemetry
self._spark = spark
self._inline_definitions = inline_definitions
self.all_linters = {
Expand Down Expand Up @@ -233,7 +236,15 @@ def test(self) -> Run:
run.finish()

if self._publish_url is not None:
publish_datamesh_manager(run, self._publish_url)
try:
publish_datamesh_manager(run, self._publish_url)
except:
logging.error("Failed to publish to datamesh manager")
if self._publish_to_opentelemetry:
try:
publish_opentelemetry(run)
except:
logging.error("Failed to publish to opentelemetry")

return run

Expand Down
93 changes: 93 additions & 0 deletions datacontract/integration/publish_opentelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import logging
import os
from importlib import metadata
from uuid import uuid4
import math

from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.metrics import Observation

from datacontract.model.run import \
Run
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader

# Publishes metrics of a test run.
# Metric contains the values:
# 0 == test run passed,
# 1 == test run has warnings
# 2 == test run failed
# 3 == test run not possible due to an error
# 4 == test status unknown
#
# Tested with these environment variables:
#
# OTEL_SERVICE_NAME=datacontract-cli
# OTEL_EXPORTER_OTLP_ENDPOINT=https://YOUR_ID.apm.westeurope.azure.elastic-cloud.com:443
# OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer%20secret (Optional, when using SaaS Products)
# OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf (Optional, because it is the default value)
#
# Current limitations:
# - no gRPC support
# - currently, only ConsoleExporter and OTLP Exporter
# - Metrics only, no logs yet (but loosely planned)

def publish_opentelemetry(run: Run):
try:
if run.dataContractId is None:
raise Exception("Cannot publish run results, as data contract ID is unknown")

endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
logging.info(f"Publishing test results to opentelemetry at {endpoint}")

telemetry = Telemetry()
provider = metrics.get_meter_provider()
meter = provider.get_meter("com.datacontract.cli", metadata.version("datacontract-cli"))
meter.create_observable_gauge(
name="datacontract.cli.test",
callbacks=[lambda x: _to_observation_callback(run)],
unit="result",
description="The overall result of the data contract test run")

telemetry.publish()
except Exception as e:
logging.error(f"Failed publishing test results. Error: {str(e)}")


def _to_observation_callback(run):
yield _to_observation(run)


def _to_observation(run):
attributes = {
"datacontract.id": run.dataContractId,
"datacontract.version": run.dataContractVersion,
}

if run.result == "passed":
result_value = 0 # think of exit codes
elif run.result == "warning":
result_value = 1
elif run.result == "failed":
result_value = 2
elif run.result == "error":
result_value = 3
else:
result_value = 4
return Observation(value=result_value, attributes=attributes)


class Telemetry:
def __init__(self):
self.exporter = ConsoleMetricExporter()
self.remote_exporter = OTLPMetricExporter()
# using math.inf so it does not collect periodically. we do this in collect ourselves, one-time.
self.reader = PeriodicExportingMetricReader(self.exporter, export_interval_millis=math.inf)
self.remote_reader = PeriodicExportingMetricReader(self.remote_exporter, export_interval_millis=math.inf)
provider = MeterProvider(metric_readers=[self.reader, self.remote_reader])
metrics.set_meter_provider(provider)

def publish(self):
self.reader.collect()
self.remote_reader.collect()
2 changes: 1 addition & 1 deletion datacontract/model/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Run(BaseModel):
server: Optional[str] = None
timestampStart: datetime
timestampEnd: datetime
result: str = "unknown"
result: str = "unknown" # passed, warning, failed, error, unknown
checks: List[Check]
logs: List[Log]

Expand Down
31 changes: 31 additions & 0 deletions tests/test_integration_opentelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging
from uuid import uuid4

from opentelemetry.metrics import Observation
from typer.testing import CliRunner

from datacontract.integration.publish_opentelemetry import _to_observation
from datacontract.model.run import Run

logging.basicConfig(level=logging.DEBUG, force=True)


def test_convert_to_observation():
run = Run(
runId=uuid4(),
dataContractId="datacontract-id-1234",
dataContractVersion="1.0.0",
result="passed",
timestampStart="2021-01-01T00:00:00Z",
timestampEnd="2021-01-01T00:00:00Z",
checks=[],
logs=[],
)
expected = Observation(value=0, attributes={
"datacontract.id": "datacontract-id-1234",
"datacontract.version": "1.0.0",
})

actual = _to_observation(run)

assert expected == actual

0 comments on commit db3b28c

Please sign in to comment.