diff --git a/datacontract/cli.py b/datacontract/cli.py index d856dfba..66e60f7a 100644 --- a/datacontract/cli.py +++ b/datacontract/cli.py @@ -100,6 +100,8 @@ def test( help="Run the schema and quality tests on the example data within the data contract.")] = None, publish: Annotated[str, typer.Option( help="The url to publish the results after the test")] = None, + publish_to_opentelemetry: Annotated[bool, typer.Option( + help="Publish the results to opentelemetry. Use environment variables to configure the OTLP endpoint, headers, etc.")] = False, logs: Annotated[bool, typer.Option( help="Print logs")] = False, ): @@ -109,8 +111,13 @@ def test( print(f"Testing {location}") if server == "all": server = None - run = DataContract(data_contract_file=location, schema_location=schema, publish_url=publish, server=server, - examples=examples).test() + run = DataContract(data_contract_file=location, + schema_location=schema, + publish_url=publish, + publish_to_opentelemetry=publish_to_opentelemetry, + server=server, + examples=examples, + ).test() if logs: _print_logs(run) _handle_result(run) diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py index 8f10a3c8..a8f1cfec 100644 --- a/datacontract/data_contract.py +++ b/datacontract/data_contract.py @@ -26,6 +26,7 @@ from datacontract.imports.sql_importer import import_sql from datacontract.integration.publish_datamesh_manager import \ publish_datamesh_manager +from datacontract.integration.publish_opentelemetry import publish_opentelemetry from datacontract.lint import resolve from datacontract.model.breaking_change import BreakingChanges, BreakingChange, Severity @@ -71,6 +72,7 @@ def __init__( server: str = None, examples: bool = False, publish_url: str = None, + publish_to_opentelemetry: bool = False, spark: str = None, inline_definitions: bool = False, ): @@ -81,6 +83,7 @@ def __init__( self._server = server self._examples = examples self._publish_url = publish_url + self._publish_to_opentelemetry = publish_to_opentelemetry self._spark = spark self._inline_definitions = inline_definitions self.all_linters = { @@ -233,7 +236,15 @@ def test(self) -> Run: run.finish() if self._publish_url is not None: - publish_datamesh_manager(run, self._publish_url) + try: + publish_datamesh_manager(run, self._publish_url) + except: + logging.error("Failed to publish to datamesh manager") + if self._publish_to_opentelemetry: + try: + publish_opentelemetry(run) + except: + logging.error("Failed to publish to opentelemetry") return run diff --git a/datacontract/integration/publish_opentelemetry.py b/datacontract/integration/publish_opentelemetry.py new file mode 100644 index 00000000..2789506a --- /dev/null +++ b/datacontract/integration/publish_opentelemetry.py @@ -0,0 +1,93 @@ +import logging +import os +from importlib import metadata +from uuid import uuid4 +import math + +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.metrics import Observation + +from datacontract.model.run import \ + Run +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader + +# Publishes metrics of a test run. +# Metric contains the values: +# 0 == test run passed, +# 1 == test run has warnings +# 2 == test run failed +# 3 == test run not possible due to an error +# 4 == test status unknown +# +# Tested with these environment variables: +# +# OTEL_SERVICE_NAME=datacontract-cli +# OTEL_EXPORTER_OTLP_ENDPOINT=https://YOUR_ID.apm.westeurope.azure.elastic-cloud.com:443 +# OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer%20secret (Optional, when using SaaS Products) +# OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf (Optional, because it is the default value) +# +# Current limitations: +# - no gRPC support +# - currently, only ConsoleExporter and OTLP Exporter +# - Metrics only, no logs yet (but loosely planned) + +def publish_opentelemetry(run: Run): + try: + if run.dataContractId is None: + raise Exception("Cannot publish run results, as data contract ID is unknown") + + endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + logging.info(f"Publishing test results to opentelemetry at {endpoint}") + + telemetry = Telemetry() + provider = metrics.get_meter_provider() + meter = provider.get_meter("com.datacontract.cli", metadata.version("datacontract-cli")) + meter.create_observable_gauge( + name="datacontract.cli.test", + callbacks=[lambda x: _to_observation_callback(run)], + unit="result", + description="The overall result of the data contract test run") + + telemetry.publish() + except Exception as e: + logging.error(f"Failed publishing test results. Error: {str(e)}") + + +def _to_observation_callback(run): + yield _to_observation(run) + + +def _to_observation(run): + attributes = { + "datacontract.id": run.dataContractId, + "datacontract.version": run.dataContractVersion, + } + + if run.result == "passed": + result_value = 0 # think of exit codes + elif run.result == "warning": + result_value = 1 + elif run.result == "failed": + result_value = 2 + elif run.result == "error": + result_value = 3 + else: + result_value = 4 + return Observation(value=result_value, attributes=attributes) + + +class Telemetry: + def __init__(self): + self.exporter = ConsoleMetricExporter() + self.remote_exporter = OTLPMetricExporter() + # using math.inf so it does not collect periodically. we do this in collect ourselves, one-time. + self.reader = PeriodicExportingMetricReader(self.exporter, export_interval_millis=math.inf) + self.remote_reader = PeriodicExportingMetricReader(self.remote_exporter, export_interval_millis=math.inf) + provider = MeterProvider(metric_readers=[self.reader, self.remote_reader]) + metrics.set_meter_provider(provider) + + def publish(self): + self.reader.collect() + self.remote_reader.collect() diff --git a/datacontract/model/run.py b/datacontract/model/run.py index 6087317d..4a83cf5d 100644 --- a/datacontract/model/run.py +++ b/datacontract/model/run.py @@ -32,7 +32,7 @@ class Run(BaseModel): server: Optional[str] = None timestampStart: datetime timestampEnd: datetime - result: str = "unknown" + result: str = "unknown" # passed, warning, failed, error, unknown checks: List[Check] logs: List[Log] diff --git a/tests/test_integration_opentelemetry.py b/tests/test_integration_opentelemetry.py new file mode 100644 index 00000000..5f0729c1 --- /dev/null +++ b/tests/test_integration_opentelemetry.py @@ -0,0 +1,31 @@ +import logging +from uuid import uuid4 + +from opentelemetry.metrics import Observation +from typer.testing import CliRunner + +from datacontract.integration.publish_opentelemetry import _to_observation +from datacontract.model.run import Run + +logging.basicConfig(level=logging.DEBUG, force=True) + + +def test_convert_to_observation(): + run = Run( + runId=uuid4(), + dataContractId="datacontract-id-1234", + dataContractVersion="1.0.0", + result="passed", + timestampStart="2021-01-01T00:00:00Z", + timestampEnd="2021-01-01T00:00:00Z", + checks=[], + logs=[], + ) + expected = Observation(value=0, attributes={ + "datacontract.id": "datacontract-id-1234", + "datacontract.version": "1.0.0", + }) + + actual = _to_observation(run) + + assert expected == actual