diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index c3633212cd278..ab61abb77ca8e 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -72,6 +72,7 @@ # Delimiter is placed between the universal metric prefix and the unique metric name. DEFAULT_METRIC_NAME_DELIMITER = "." +_otel_logger_instance = None def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str: """Assembles the prefix, delimiter, and name and returns it as a string.""" @@ -390,41 +391,45 @@ def poke_gauge(self, name: str, attributes: Attributes = None) -> GaugeValues: def get_otel_logger(cls) -> SafeOtelLogger: - host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector" - port = conf.getint("metrics", "otel_port") # ex: 4318 - prefix = conf.get("metrics", "otel_prefix") # ex: "airflow" - ssl_active = conf.getboolean("metrics", "otel_ssl_active") - # PeriodicExportingMetricReader will default to an interval of 60000 millis. - interval = conf.getint("metrics", "otel_interval_milliseconds", fallback=None) # ex: 30000 - debug = conf.getboolean("metrics", "otel_debugging_on") - service_name = conf.get("metrics", "otel_service") - - resource = Resource.create(attributes={HOST_NAME: get_hostname(), SERVICE_NAME: service_name}) - - protocol = "https" if ssl_active else "http" - endpoint = f"{protocol}://{host}:{port}/v1/metrics" - - log.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s", endpoint) - readers = [ - PeriodicExportingMetricReader( - OTLPMetricExporter( - endpoint=endpoint, - headers={"Content-Type": "application/json"}, - ), - export_interval_millis=interval, - ) - ] + global _otel_logger_instance + if _otel_logger_instance is None: + host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector" + port = conf.getint("metrics", "otel_port") # ex: 4318 + prefix = conf.get("metrics", "otel_prefix") # ex: "airflow" + ssl_active = conf.getboolean("metrics", "otel_ssl_active") + # PeriodicExportingMetricReader will default to an interval of 60000 millis. + interval = conf.getint("metrics", "otel_interval_milliseconds", fallback=None) # ex: 30000 + debug = conf.getboolean("metrics", "otel_debugging_on") + service_name = conf.get("metrics", "otel_service") + + resource = Resource.create(attributes={HOST_NAME: get_hostname(), SERVICE_NAME: service_name}) + + protocol = "https" if ssl_active else "http" + endpoint = f"{protocol}://{host}:{port}/v1/metrics" + + log.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s", endpoint) + readers = [ + PeriodicExportingMetricReader( + OTLPMetricExporter( + endpoint=endpoint, + headers={"Content-Type": "application/json"}, + ), + export_interval_millis=interval, + ) + ] - if debug: - export_to_console = PeriodicExportingMetricReader(ConsoleMetricExporter()) - readers.append(export_to_console) + if debug: + export_to_console = PeriodicExportingMetricReader(ConsoleMetricExporter()) + readers.append(export_to_console) - metrics.set_meter_provider( - MeterProvider( - resource=resource, - metric_readers=readers, - shutdown_on_exit=False, - ), - ) + metrics.set_meter_provider( + MeterProvider( + resource=resource, + metric_readers=readers, + shutdown_on_exit=False, + ), + ) - return SafeOtelLogger(metrics.get_meter_provider(), prefix, get_validator()) + _otel_logger_instance = SafeOtelLogger(metrics.get_meter_provider(), prefix, get_validator()) + + return _otel_logger_instance \ No newline at end of file