From ec857695ffcbcc08f2ca65e2765e6a966df16e3a Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 8 Nov 2024 12:12:40 -0600 Subject: [PATCH 01/19] WIP:instrument task runs --- requirements-client.txt | 1 + src/prefect/task_engine.py | 153 +++++++++++++++++++++++++++++++++---- 2 files changed, 141 insertions(+), 13 deletions(-) diff --git a/requirements-client.txt b/requirements-client.txt index 0e0b0ca443a5..5efe0769d439 100644 --- a/requirements-client.txt +++ b/requirements-client.txt @@ -14,6 +14,7 @@ httpx[http2] >= 0.23, != 0.23.2 importlib_metadata >= 4.4; python_version < '3.10' jsonpatch >= 1.32, < 2.0 jsonschema >= 4.0.0, < 5.0.0 +opentelemetry-api orjson >= 3.7, < 4.0 packaging >= 21.3, < 24.3 pathspec >= 0.8.0 diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index a4c948fb5bc7..c0bda0cb6b92 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -21,6 +21,7 @@ Optional, Sequence, Set, + Tuple, Type, TypeVar, Union, @@ -29,8 +30,18 @@ import anyio import pendulum +from opentelemetry import trace +from opentelemetry.trace import ( + Link, + SpanContext, + Status, + StatusCode, + Tracer, + get_tracer, +) from typing_extensions import ParamSpec +import prefect from prefect import Task from prefect.client.orchestration import PrefectClient, SyncPrefectClient, get_client from prefect.client.schemas import TaskRun @@ -98,6 +109,28 @@ BACKOFF_MAX = 10 +def digest_task_inputs(inputs, parameters) -> Tuple[Dict[str, str], list[Link]]: + parameter_attributes = {} + links = [] + for key, value in inputs.items(): + for input in value: + if isinstance(input, TaskRunInput): + parameter_attributes[f"prefect.run.parameter.{key}"] = type( + parameters[key] + ).__name__ + links.append( + Link( + SpanContext(trace_id=int(input.id), span_id=0, is_remote=True), + attributes={"prefect.run.id": str(input.id)}, + ) + ) + else: + parameter_attributes[f"prefect.run.parameter.{key}"] = type( + input + ).__name__ + return parameter_attributes, links + + class TaskRunTimeoutError(TimeoutError): """Raised when a task run exceeds its timeout.""" @@ -119,6 +152,9 @@ class BaseTaskRunEngine(Generic[P, R]): _is_started: bool = False _task_name_set: bool = False _last_event: Optional[PrefectEvent] = None + _tracer: Tracer = field( + default_factory=lambda: get_tracer("prefect", prefect.__version__) + ) def __post_init__(self): if self.parameters is None: @@ -453,7 +489,15 @@ def set_state(self, state: State, force: bool = False) -> State: validated_state=self.task_run.state, follows=self._last_event, ) - + self._span.add_event( + new_state.name, + { + "prefect.state.message": new_state.message or "", + "prefect.state.type": new_state.type, + "prefect.state.name": new_state.name or new_state.type, + "prefect.state.id": str(new_state.id), + }, + ) return new_state def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -507,6 +551,11 @@ def handle_success(self, result: R, transaction: Transaction) -> R: self.record_terminal_state_timing(terminal_state) self.set_state(terminal_state) self._return_value = result + + self._span.set_status(Status(StatusCode.OK), terminal_state.message) + self._span.end(time.time_ns()) + self._span = None + return result def handle_retry(self, exc: Exception) -> bool: @@ -555,6 +604,7 @@ def handle_retry(self, exc: Exception) -> bool: def handle_exception(self, exc: Exception) -> None: # If the task fails, and we have retries left, set the task to retrying. + self._span.record_exception(exc) if not self.handle_retry(exc): # If the task has no retries left, or the retry condition is not met, set the task to failed. state = run_coro_as_sync( @@ -569,6 +619,10 @@ def handle_exception(self, exc: Exception) -> None: self.set_state(state) self._raised = exc + self._span.set_status(Status(StatusCode.ERROR, state.message)) + self._span.end(time.time_ns()) + self._span = None + def handle_timeout(self, exc: TimeoutError) -> None: if not self.handle_retry(exc): if isinstance(exc, TaskRunTimeoutError): @@ -592,6 +646,11 @@ def handle_crash(self, exc: BaseException) -> None: self.set_state(state, force=True) self._raised = exc + self._span.record_exception(exc) + self._span.set_status(Status(StatusCode.ERROR, state.message)) + self._span.end(time.time_ns()) + self._span = None + @contextmanager def setup_run_context(self, client: Optional[SyncPrefectClient] = None): from prefect.utilities.engine import ( @@ -642,14 +701,18 @@ def initialize_run( with SyncClientContext.get_or_create() as client_ctx: self._client = client_ctx.client self._is_started = True + flow_run_context = FlowRunContext.get() + parent_task_run_context = TaskRunContext.get() + self.logger.info(f"parameters {self.parameters}") + try: if not self.task_run: self.task_run = run_coro_as_sync( self.task.create_local_run( id=task_run_id, parameters=self.parameters, - flow_run_context=FlowRunContext.get(), - parent_task_run_context=TaskRunContext.get(), + flow_run_context=flow_run_context, + parent_task_run_context=parent_task_run_context, wait_for=self.wait_for, extra_task_inputs=dependencies, ) @@ -666,6 +729,31 @@ def initialize_run( self.logger.debug( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) + + context = None + parameter_attributes, links = digest_task_inputs( + self.task_run.task_inputs, self.parameters + ) + + if not parent_task_run_context and not flow_run_context: + context = SpanContext( + trace_id=int(self.task_run.id), + span_id=0, + is_remote=False, + ) + + self._span = self._tracer.start_span( + name=self.task_run.name, + attributes={ + "prefect.run.type": "task", + "prefect.run.id": str(self.task_run.id), + "prefect.tags": self.task_run.tags, + **parameter_attributes, + }, + links=links, + context=context, + ) + yield self except TerminationSignal as exc: @@ -717,11 +805,12 @@ def start( dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None, ) -> Generator[None, None, None]: with self.initialize_run(task_run_id=task_run_id, dependencies=dependencies): - self.begin_run() - try: - yield - finally: - self.call_hooks() + with trace.use_span(self._span): + self.begin_run() + try: + yield + finally: + self.call_hooks() @contextmanager def transaction_context(self) -> Generator[Transaction, None, None]: @@ -964,6 +1053,16 @@ async def set_state(self, state: State, force: bool = False) -> State: follows=self._last_event, ) + self._span.add_event( + new_state.name, + { + "prefect.state.message": new_state.message or "", + "prefect.state.type": new_state.type, + "prefect.state.name": new_state.name or new_state.type, + "prefect.state.id": str(new_state.id), + }, + ) + return new_state async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -1012,6 +1111,11 @@ async def handle_success(self, result: R, transaction: Transaction) -> R: self.record_terminal_state_timing(terminal_state) await self.set_state(terminal_state) self._return_value = result + + self._span.set_status(Status(StatusCode.OK, terminal_state.message)) + self._span.end(time.time_ns()) + self._span = None + return result async def handle_retry(self, exc: Exception) -> bool: @@ -1060,6 +1164,7 @@ async def handle_retry(self, exc: Exception) -> bool: async def handle_exception(self, exc: Exception) -> None: # If the task fails, and we have retries left, set the task to retrying. + self._span.record_exception(exc) if not await self.handle_retry(exc): # If the task has no retries left, or the retry condition is not met, set the task to failed. state = await exception_to_failed_state( @@ -1070,8 +1175,12 @@ async def handle_exception(self, exc: Exception) -> None: self.record_terminal_state_timing(state) await self.set_state(state) self._raised = exc + self._span.set_status(Status(StatusCode.ERROR, state.message)) + self._span.end(time.time_ns()) + self._span = None async def handle_timeout(self, exc: TimeoutError) -> None: + self._span.record_exception(exc) if not await self.handle_retry(exc): if isinstance(exc, TaskRunTimeoutError): message = f"Task run exceeded timeout of {self.task.timeout_seconds} second(s)" @@ -1085,6 +1194,9 @@ async def handle_timeout(self, exc: TimeoutError) -> None: ) await self.set_state(state) self._raised = exc + self._span.set_status(Status(StatusCode.ERROR, state.message)) + self._span.end(time.time_ns()) + self._span = None async def handle_crash(self, exc: BaseException) -> None: state = await exception_to_crashed_state(exc) @@ -1094,6 +1206,11 @@ async def handle_crash(self, exc: BaseException) -> None: await self.set_state(state, force=True) self._raised = exc + self._span.record_exception(exc) + self._span.set_status(Status(StatusCode.ERROR, state.message)) + self._span.end(time.time_ns()) + self._span = None + @asynccontextmanager async def setup_run_context(self, client: Optional[PrefectClient] = None): from prefect.utilities.engine import ( @@ -1165,6 +1282,15 @@ async def initialize_run( self.logger.debug( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) + self._span = self._tracer.start_span( + name=self.task_run.name, + attributes={ + "prefect.run.type": "flow", + "prefect.run.id": str(self.task_run.id), + "prefect.tags": self.task_run.tags, + }, + ) + yield self except TerminationSignal as exc: @@ -1218,11 +1344,12 @@ async def start( async with self.initialize_run( task_run_id=task_run_id, dependencies=dependencies ): - await self.begin_run() - try: - yield - finally: - await self.call_hooks() + with trace.use_span(self.span): + await self.begin_run() + try: + yield + finally: + await self.call_hooks() @asynccontextmanager async def transaction_context(self) -> AsyncGenerator[Transaction, None]: From f3ae6b3f2a241ad6558bc396f133bd02150b72fa Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Mon, 11 Nov 2024 10:24:08 -0600 Subject: [PATCH 02/19] async engine fix --- src/prefect/task_engine.py | 2 +- tests/test_instrumentation.py | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 tests/test_instrumentation.py diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index c0bda0cb6b92..68bb2065610e 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -1344,7 +1344,7 @@ async def start( async with self.initialize_run( task_run_id=task_run_id, dependencies=dependencies ): - with trace.use_span(self.span): + with trace.use_span(self._span): await self.begin_run() try: yield diff --git a/tests/test_instrumentation.py b/tests/test_instrumentation.py new file mode 100644 index 000000000000..e69de29bb2d1 From 943cfe67fa5925e02c3e25b250e175428b2017f0 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Tue, 12 Nov 2024 11:21:21 -0600 Subject: [PATCH 03/19] more unit tests --- src/prefect/client/schemas/objects.py | 5 + src/prefect/task_engine.py | 7 +- tests/telemetry/test_instrumentation.py | 135 +++++++++++++++++++++++- tests/test_instrumentation.py | 0 4 files changed, 145 insertions(+), 2 deletions(-) delete mode 100644 tests/test_instrumentation.py diff --git a/src/prefect/client/schemas/objects.py b/src/prefect/client/schemas/objects.py index b4f2fc31cbee..b26f2e9bff0a 100644 --- a/src/prefect/client/schemas/objects.py +++ b/src/prefect/client/schemas/objects.py @@ -555,6 +555,11 @@ class FlowRun(ObjectBaseModel): description="A list of tags on the flow run", examples=[["tag-1", "tag-2"]], ) + labels: Dict[str, str] = Field( + default_factory=dict, + description="A dictionary of labels on the flow run", + examples=[{"my-label": "my-value"}], + ) parent_task_run_id: Optional[UUID] = Field( default=None, description=( diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index afbb40a5ea04..c3492cce755f 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -115,6 +115,8 @@ def digest_task_inputs(inputs, parameters) -> Tuple[Dict[str, str], list[Link]]: links = [] for key, value in inputs.items(): for input in value: + if input in ("__parents__", "wait_for"): + continue if isinstance(input, TaskRunInput): parameter_attributes[f"prefect.run.parameter.{key}"] = type( parameters[key] @@ -716,7 +718,6 @@ def initialize_run( self._is_started = True flow_run_context = FlowRunContext.get() parent_task_run_context = TaskRunContext.get() - self.logger.info(f"parameters {self.parameters}") try: if not self.task_run: @@ -754,6 +755,9 @@ def initialize_run( span_id=0, is_remote=False, ) + labels = {} + if flow_run_context: + labels = flow_run_context.flow_run.labels self._span = self._tracer.start_span( name=self.task_run.name, @@ -762,6 +766,7 @@ def initialize_run( "prefect.run.id": str(self.task_run.id), "prefect.tags": self.task_run.tags, **parameter_attributes, + **labels, }, links=links, context=context, diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index a86e4f725b9e..a2ed0df97870 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -1,5 +1,6 @@ import os -from uuid import UUID +from unittest.mock import Mock, patch +from uuid import UUID, uuid4 import pytest from opentelemetry import metrics, trace @@ -12,6 +13,10 @@ from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.trace import TracerProvider +from prefect import flow, task +from prefect.client.schemas import TaskRun +from prefect.states import Completed, Running +from prefect.task_engine import AsyncTaskRunEngine, digest_task_inputs from prefect.telemetry.bootstrap import setup_telemetry from prefect.telemetry.instrumentation import extract_account_and_workspace_id from prefect.telemetry.logging import get_log_handler @@ -160,3 +165,131 @@ def test_logger_provider( log_handler = get_log_handler() assert isinstance(log_handler, LoggingHandler) assert log_handler._logger_provider == logger_provider + + +class TestTaskRunInstrumentation: + @pytest.fixture + def mock_tracer(): + trace_provider, _, _ = setup_telemetry() + return trace_provider.get_tracer("prefect.test") + + @pytest.fixture + async def task_run_engine(mock_tracer): + @task + async def test_task(x: int, y: int): + return x + y + + task_run = TaskRun( + id=uuid4(), + task_key="test_task", + flow_run_id=uuid4(), + state=Running(), + dynamic_key="test_task-1", + ) + + engine = AsyncTaskRunEngine( + task=test_task, + task_run=task_run, + parameters={"x": 1, "y": 2}, + _tracer=mock_tracer, + ) + return engine + + def test_digest_task_inputs(): + inputs = {"x": 1, "y": 2} + parameters = {"x": int, "y": int} + otel_params, otel_inputs = digest_task_inputs(inputs, parameters) + assert otel_params == { + "prefect.run.parameter.x": "int", + "prefect.run.parameter.y": "int", + } + assert otel_inputs == [] + + @pytest.mark.asyncio + async def test_span_creation(task_run_engine, mock_tracer): + async with task_run_engine.start(): + assert task_run_engine._span is not None + assert task_run_engine._span.name == task_run_engine.task_run.name + assert task_run_engine._span.attributes["prefect.run.type"] == "task" + assert task_run_engine._span.attributes["prefect.run.id"] == str( + task_run_engine.task_run.id + ) + + @pytest.mark.asyncio + async def test_span_attributes(task_run_engine): + async with task_run_engine.start(): + assert "prefect.run.parameter.x" in task_run_engine._span.attributes + assert "prefect.run.parameter.y" in task_run_engine._span.attributes + assert task_run_engine._span.attributes["prefect.run.parameter.x"] == "int" + assert task_run_engine._span.attributes["prefect.run.parameter.y"] == "int" + + @pytest.mark.asyncio + async def test_span_events(task_run_engine): + async with task_run_engine.start(): + await task_run_engine.set_state(Running()) + await task_run_engine.set_state(Completed()) + + events = task_run_engine._span.events + assert len(events) == 2 + assert events[0].name == "Running" + assert events[1].name == "Completed" + + @pytest.mark.asyncio + async def test_span_status_on_success(task_run_engine): + async with task_run_engine.start(): + async with task_run_engine.run_context(): + await task_run_engine.handle_success(3, Mock()) + + assert task_run_engine._span.status.status_code == trace.StatusCode.OK + + @pytest.mark.asyncio + async def test_span_status_on_failure(task_run_engine): + async with task_run_engine.start(): + async with task_run_engine.run_context(): + await task_run_engine.handle_exception(ValueError("Test error")) + + assert task_run_engine._span.status.status_code == trace.StatusCode.ERROR + assert "Test error" in task_run_engine._span.status.description + + @pytest.mark.asyncio + async def test_span_exception_recording(task_run_engine): + test_exception = ValueError("Test error") + async with task_run_engine.start(): + async with task_run_engine.run_context(): + await task_run_engine.handle_exception(test_exception) + + events = task_run_engine._span.events + assert any(event.name == "exception" for event in events) + exception_event = next(event for event in events if event.name == "exception") + assert exception_event.attributes["exception.type"] == "ValueError" + assert exception_event.attributes["exception.message"] == "Test error" + + @pytest.mark.asyncio + async def test_span_links(task_run_engine): + # Simulate a parent task run + parent_task_run_id = uuid4() + task_run_engine.task_run.task_inputs = { + "x": [{"id": parent_task_run_id}], + "y": [2], + } + + async with task_run_engine.start(): + pass + + assert len(task_run_engine._span.links) == 1 + link = task_run_engine._span.links[0] + assert link.context.trace_id == int(parent_task_run_id) + assert link.attributes["prefect.run.id"] == str(parent_task_run_id) + + @pytest.mark.asyncio + async def test_flow_run_labels(task_run_engine): + @flow + async def test_flow(): + return await task_run_engine.task() + + with patch("prefect.context.FlowRunContext.get") as mock_flow_run_context: + mock_flow_run_context.return_value.flow_run.labels = {"env": "test"} + async with task_run_engine.start(): + pass + + assert task_run_engine._span.attributes["env"] == "test" diff --git a/tests/test_instrumentation.py b/tests/test_instrumentation.py deleted file mode 100644 index e69de29bb2d1..000000000000 From f8f8d0976c871e72f369091f8888dd92ec3a4603 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Tue, 12 Nov 2024 11:45:12 -0600 Subject: [PATCH 04/19] in memory exporter --- tests/telemetry/test_instrumentation.py | 230 +++++++++++++----------- 1 file changed, 128 insertions(+), 102 deletions(-) diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index a2ed0df97870..6f996ef84434 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -12,6 +12,7 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import InMemorySpanExporter from prefect import flow, task from prefect.client.schemas import TaskRun @@ -168,34 +169,15 @@ def test_logger_provider( class TestTaskRunInstrumentation: - @pytest.fixture - def mock_tracer(): - trace_provider, _, _ = setup_telemetry() - return trace_provider.get_tracer("prefect.test") - - @pytest.fixture - async def task_run_engine(mock_tracer): - @task - async def test_task(x: int, y: int): - return x + y - - task_run = TaskRun( - id=uuid4(), - task_key="test_task", - flow_run_id=uuid4(), - state=Running(), - dynamic_key="test_task-1", - ) + @task + def test_task(x: int, y: int): + return x + y - engine = AsyncTaskRunEngine( - task=test_task, - task_run=task_run, - parameters={"x": 1, "y": 2}, - _tracer=mock_tracer, - ) - return engine + @task + def returns_4(): + return 4 - def test_digest_task_inputs(): + def test_digest_task_inputs(self): inputs = {"x": 1, "y": 2} parameters = {"x": int, "y": int} otel_params, otel_inputs = digest_task_inputs(inputs, parameters) @@ -205,91 +187,135 @@ def test_digest_task_inputs(): } assert otel_inputs == [] - @pytest.mark.asyncio - async def test_span_creation(task_run_engine, mock_tracer): - async with task_run_engine.start(): - assert task_run_engine._span is not None - assert task_run_engine._span.name == task_run_engine.task_run.name - assert task_run_engine._span.attributes["prefect.run.type"] == "task" - assert task_run_engine._span.attributes["prefect.run.id"] == str( - task_run_engine.task_run.id - ) - - @pytest.mark.asyncio - async def test_span_attributes(task_run_engine): - async with task_run_engine.start(): - assert "prefect.run.parameter.x" in task_run_engine._span.attributes - assert "prefect.run.parameter.y" in task_run_engine._span.attributes - assert task_run_engine._span.attributes["prefect.run.parameter.x"] == "int" - assert task_run_engine._span.attributes["prefect.run.parameter.y"] == "int" + def test_linked_parameters(self): + trace_provider, _, _ = setup_telemetry() + tracer = trace_provider.get_tracer(__name__) + with tracer.start_as_current_span("test_task"): + self.test_task(x=self.returns_4(), y=2) - @pytest.mark.asyncio - async def test_span_events(task_run_engine): - async with task_run_engine.start(): - await task_run_engine.set_state(Running()) - await task_run_engine.set_state(Completed()) - events = task_run_engine._span.events - assert len(events) == 2 - assert events[0].name == "Running" - assert events[1].name == "Completed" +@pytest.fixture +def mock_tracer(): + trace_provider, _, _ = setup_telemetry() + span_exporter = InMemorySpanExporter() + span_processor = InFlightSpanProcessor(span_exporter) + trace_provider.add_span_processor(span_processor) + trace.set_tracer_provider(trace_provider) + return trace.get_tracer("prefect.test") + + +@pytest.fixture +async def task_run_engine(mock_tracer): + @task + async def test_task(x: int, y: int): + return x + y + + task_run = TaskRun( + id=uuid4(), + task_key="test_task", + flow_run_id=uuid4(), + state=Running(), + dynamic_key="test_task-1", + ) - @pytest.mark.asyncio - async def test_span_status_on_success(task_run_engine): - async with task_run_engine.start(): - async with task_run_engine.run_context(): - await task_run_engine.handle_success(3, Mock()) + engine = AsyncTaskRunEngine( + task=test_task, + task_run=task_run, + parameters={"x": 1, "y": 2}, + _tracer=mock_tracer, + ) + return engine - assert task_run_engine._span.status.status_code == trace.StatusCode.OK - @pytest.mark.asyncio - async def test_span_status_on_failure(task_run_engine): - async with task_run_engine.start(): - async with task_run_engine.run_context(): - await task_run_engine.handle_exception(ValueError("Test error")) +@pytest.mark.asyncio +async def test_span_creation(task_run_engine, mock_tracer): + async with task_run_engine.start(): + assert task_run_engine._span is not None + assert task_run_engine._span.name == task_run_engine.task_run.name + assert task_run_engine._span.attributes["prefect.run.type"] == "task" + assert task_run_engine._span.attributes["prefect.run.id"] == str( + task_run_engine.task_run.id + ) - assert task_run_engine._span.status.status_code == trace.StatusCode.ERROR - assert "Test error" in task_run_engine._span.status.description - @pytest.mark.asyncio - async def test_span_exception_recording(task_run_engine): - test_exception = ValueError("Test error") - async with task_run_engine.start(): - async with task_run_engine.run_context(): - await task_run_engine.handle_exception(test_exception) - - events = task_run_engine._span.events - assert any(event.name == "exception" for event in events) - exception_event = next(event for event in events if event.name == "exception") - assert exception_event.attributes["exception.type"] == "ValueError" - assert exception_event.attributes["exception.message"] == "Test error" - - @pytest.mark.asyncio - async def test_span_links(task_run_engine): - # Simulate a parent task run - parent_task_run_id = uuid4() - task_run_engine.task_run.task_inputs = { - "x": [{"id": parent_task_run_id}], - "y": [2], - } +@pytest.mark.asyncio +async def test_span_attributes(task_run_engine): + async with task_run_engine.start(): + assert "prefect.run.parameter.x" in task_run_engine._span.attributes + assert "prefect.run.parameter.y" in task_run_engine._span.attributes + assert task_run_engine._span.attributes["prefect.run.parameter.x"] == "int" + assert task_run_engine._span.attributes["prefect.run.parameter.y"] == "int" - async with task_run_engine.start(): - pass - assert len(task_run_engine._span.links) == 1 - link = task_run_engine._span.links[0] - assert link.context.trace_id == int(parent_task_run_id) - assert link.attributes["prefect.run.id"] == str(parent_task_run_id) +@pytest.mark.asyncio +async def test_span_events(task_run_engine): + async with task_run_engine.start(): + await task_run_engine.set_state(Running()) + await task_run_engine.set_state(Completed()) - @pytest.mark.asyncio - async def test_flow_run_labels(task_run_engine): - @flow - async def test_flow(): - return await task_run_engine.task() + events = task_run_engine._span.events + assert len(events) == 2 + assert events[0].name == "Running" + assert events[1].name == "Completed" - with patch("prefect.context.FlowRunContext.get") as mock_flow_run_context: - mock_flow_run_context.return_value.flow_run.labels = {"env": "test"} - async with task_run_engine.start(): - pass - assert task_run_engine._span.attributes["env"] == "test" +@pytest.mark.asyncio +async def test_span_status_on_success(task_run_engine): + async with task_run_engine.start(): + async with task_run_engine.run_context(): + await task_run_engine.handle_success(3, Mock()) + + assert task_run_engine._span.status.status_code == trace.StatusCode.OK + + +@pytest.mark.asyncio +async def test_span_status_on_failure(task_run_engine): + async with task_run_engine.start(): + async with task_run_engine.run_context(): + await task_run_engine.handle_exception(ValueError("Test error")) + + assert task_run_engine._span.status.status_code == trace.StatusCode.ERROR + assert "Test error" in task_run_engine._span.status.description + + +@pytest.mark.asyncio +async def test_span_exception_recording(task_run_engine): + test_exception = ValueError("Test error") + async with task_run_engine.start(): + async with task_run_engine.run_context(): + await task_run_engine.handle_exception(test_exception) + + events = task_run_engine._span.events + assert any(event.name == "exception" for event in events) + exception_event = next(event for event in events if event.name == "exception") + assert exception_event.attributes["exception.type"] == "ValueError" + assert exception_event.attributes["exception.message"] == "Test error" + + +@pytest.mark.asyncio +async def test_span_links(task_run_engine): + # Simulate a parent task run + parent_task_run_id = uuid4() + task_run_engine.task_run.task_inputs = {"x": [{"id": parent_task_run_id}], "y": [2]} + + async with task_run_engine.start(): + pass + + assert len(task_run_engine._span.links) == 1 + link = task_run_engine._span.links[0] + assert link.context.trace_id == int(parent_task_run_id) + assert link.attributes["prefect.run.id"] == str(parent_task_run_id) + + +@pytest.mark.asyncio +async def test_flow_run_labels(task_run_engine): + @flow + async def test_flow(): + return await task_run_engine.task() + + with patch("prefect.context.FlowRunContext.get") as mock_flow_run_context: + mock_flow_run_context.return_value.flow_run.labels = {"env": "test"} + async with task_run_engine.start(): + pass + + assert task_run_engine._span.attributes["env"] == "test" From 4f4d683a6fbc2d80eb70ab3ebb07daefe371bfab Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Tue, 12 Nov 2024 14:22:51 -0600 Subject: [PATCH 05/19] typo fix --- src/prefect/task_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index c3492cce755f..1500e4862db7 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -115,7 +115,7 @@ def digest_task_inputs(inputs, parameters) -> Tuple[Dict[str, str], list[Link]]: links = [] for key, value in inputs.items(): for input in value: - if input in ("__parents__", "wait_for"): + if key in ("__parents__", "wait_for"): continue if isinstance(input, TaskRunInput): parameter_attributes[f"prefect.run.parameter.{key}"] = type( From 8b62c09fbafa956539c364c1721a1afbbab9f6f3 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Wed, 13 Nov 2024 09:30:29 -0600 Subject: [PATCH 06/19] using instrumentation fixture --- src/prefect/telemetry/test_util.py | 81 +++++++++++++++++++++++++ tests/conftest.py | 1 + tests/fixtures/telemetry.py | 10 +++ tests/telemetry/test_instrumentation.py | 29 ++++----- 4 files changed, 103 insertions(+), 18 deletions(-) create mode 100644 src/prefect/telemetry/test_util.py create mode 100644 tests/fixtures/telemetry.py diff --git a/src/prefect/telemetry/test_util.py b/src/prefect/telemetry/test_util.py new file mode 100644 index 000000000000..f0461a193338 --- /dev/null +++ b/src/prefect/telemetry/test_util.py @@ -0,0 +1,81 @@ +from typing import Tuple + +from opentelemetry import metrics as metrics_api +from opentelemetry import trace as trace_api +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.test.globals_test import ( + reset_metrics_globals, + reset_trace_globals, +) + +from prefect.telemetry.processors import InFlightSpanProcessor + + +def create_tracer_provider(**kwargs) -> Tuple[TracerProvider, InMemorySpanExporter]: + """Helper to create a configured tracer provider. + + Creates and configures a `TracerProvider` with a + `SimpleSpanProcessor` and a `InMemorySpanExporter`. + All the parameters passed are forwarded to the TracerProvider + constructor. + + Returns: + A list with the tracer provider in the first element and the + in-memory span exporter in the second. + """ + tracer_provider = TracerProvider(**kwargs) + memory_exporter = InMemorySpanExporter() + # span_processor = export.SimpleSpanProcessor(memory_exporter) + span_processor = InFlightSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + + return tracer_provider, memory_exporter + + +def create_meter_provider(**kwargs) -> Tuple[MeterProvider, InMemoryMetricReader]: + """Helper to create a configured meter provider + Creates a `MeterProvider` and an `InMemoryMetricReader`. + Returns: + A tuple with the meter provider in the first element and the + in-memory metrics exporter in the second + """ + memory_reader = InMemoryMetricReader() + metric_readers = kwargs.get("metric_readers", []) + metric_readers.append(memory_reader) + kwargs["metric_readers"] = metric_readers + meter_provider = MeterProvider(**kwargs) + return meter_provider, memory_reader + + +class InstrumentationTester: + tracer_provider: TracerProvider + memory_exporter: InMemorySpanExporter + meter_provider: MeterProvider + memory_metrics_reader: InMemoryMetricReader + + def __init__(self): + self.tracer_provider, self.memory_exporter = create_tracer_provider() + # This is done because set_tracer_provider cannot override the + # current tracer provider. + reset_trace_globals() + trace_api.set_tracer_provider(self.tracer_provider) + + self.memory_exporter.clear() + # This is done because set_meter_provider cannot override the + # current meter provider. + reset_metrics_globals() + + self.meter_provider, self.memory_metrics_reader = create_meter_provider() + metrics_api.set_meter_provider(self.meter_provider) + + def reset(self): + reset_trace_globals() + reset_metrics_globals() + + def get_finished_spans(self): + return self.memory_exporter.get_finished_spans() diff --git a/tests/conftest.py b/tests/conftest.py index 1d1209df32b8..b342d26649f3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -83,6 +83,7 @@ from .fixtures.events import * from .fixtures.logging import * from .fixtures.storage import * +from .fixtures.telemetry import * from .fixtures.time import * diff --git a/tests/fixtures/telemetry.py b/tests/fixtures/telemetry.py new file mode 100644 index 000000000000..840fee700326 --- /dev/null +++ b/tests/fixtures/telemetry.py @@ -0,0 +1,10 @@ +import pytest + +from prefect.telemetry.test_util import InstrumentationTester + + +@pytest.fixture +def instrumentation(): + instrumentation_tester = InstrumentationTester() + yield instrumentation_tester + instrumentation_tester.reset() diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index 6f996ef84434..168278b9c502 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -12,7 +12,6 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import InMemorySpanExporter from prefect import flow, task from prefect.client.schemas import TaskRun @@ -187,25 +186,19 @@ def test_digest_task_inputs(self): } assert otel_inputs == [] - def test_linked_parameters(self): - trace_provider, _, _ = setup_telemetry() - tracer = trace_provider.get_tracer(__name__) - with tracer.start_as_current_span("test_task"): - self.test_task(x=self.returns_4(), y=2) - -@pytest.fixture -def mock_tracer(): - trace_provider, _, _ = setup_telemetry() - span_exporter = InMemorySpanExporter() - span_processor = InFlightSpanProcessor(span_exporter) - trace_provider.add_span_processor(span_processor) - trace.set_tracer_provider(trace_provider) - return trace.get_tracer("prefect.test") +# @pytest.fixture +# def mock_tracer(): +# trace_provider, _, _ = setup_telemetry() +# span_exporter = InMemorySpanExporter() +# span_processor = InFlightSpanProcessor(span_exporter) +# trace_provider.add_span_processor(span_processor) +# trace.set_tracer_provider(trace_provider) +# return trace.get_tracer("prefect.test") @pytest.fixture -async def task_run_engine(mock_tracer): +async def task_run_engine(instrumentation): @task async def test_task(x: int, y: int): return x + y @@ -222,13 +215,13 @@ async def test_task(x: int, y: int): task=test_task, task_run=task_run, parameters={"x": 1, "y": 2}, - _tracer=mock_tracer, + _tracer=instrumentation.tracer_provider.get_tracer("prefect.test"), ) return engine @pytest.mark.asyncio -async def test_span_creation(task_run_engine, mock_tracer): +async def test_span_creation(task_run_engine, instrumentation): async with task_run_engine.start(): assert task_run_engine._span is not None assert task_run_engine._span.name == task_run_engine.task_run.name From a2e19df803eb3d0ec77f438d5a802c949045cadb Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Thu, 14 Nov 2024 10:11:56 -0600 Subject: [PATCH 07/19] all but labels test --- src/prefect/task_engine.py | 17 +- src/prefect/telemetry/test_util.py | 7 +- tests/telemetry/test_instrumentation.py | 306 +++++++++++++----------- 3 files changed, 188 insertions(+), 142 deletions(-) diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 1500e4862db7..859f35b27f3a 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -1284,12 +1284,14 @@ async def initialize_run( async with AsyncClientContext.get_or_create(): self._client = get_client() self._is_started = True + flow_run_context = FlowRunContext.get() + try: if not self.task_run: self.task_run = await self.task.create_local_run( id=task_run_id, parameters=self.parameters, - flow_run_context=FlowRunContext.get(), + flow_run_context=flow_run_context, parent_task_run_context=TaskRunContext.get(), wait_for=self.wait_for, extra_task_inputs=dependencies, @@ -1306,12 +1308,23 @@ async def initialize_run( self.logger.debug( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) + + labels = {} + if flow_run_context: + labels = flow_run_context.flow_run.labels + + parameter_attributes = { + f"prefect.run.parameter.{k}": type(v).__name__ + for k, v in self.parameters.items() + } self._span = self._tracer.start_span( name=self.task_run.name, attributes={ - "prefect.run.type": "flow", + "prefect.run.type": "task", "prefect.run.id": str(self.task_run.id), "prefect.tags": self.task_run.tags, + **parameter_attributes, + **labels, }, ) diff --git a/src/prefect/telemetry/test_util.py b/src/prefect/telemetry/test_util.py index f0461a193338..f62a839af176 100644 --- a/src/prefect/telemetry/test_util.py +++ b/src/prefect/telemetry/test_util.py @@ -5,6 +5,7 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( InMemorySpanExporter, ) @@ -13,8 +14,6 @@ reset_trace_globals, ) -from prefect.telemetry.processors import InFlightSpanProcessor - def create_tracer_provider(**kwargs) -> Tuple[TracerProvider, InMemorySpanExporter]: """Helper to create a configured tracer provider. @@ -30,8 +29,8 @@ def create_tracer_provider(**kwargs) -> Tuple[TracerProvider, InMemorySpanExport """ tracer_provider = TracerProvider(**kwargs) memory_exporter = InMemorySpanExporter() - # span_processor = export.SimpleSpanProcessor(memory_exporter) - span_processor = InFlightSpanProcessor(memory_exporter) + span_processor = SimpleSpanProcessor(memory_exporter) + # span_processor = InFlightSpanProcessor(memory_exporter) tracer_provider.add_span_processor(span_processor) return tracer_provider, memory_exporter diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index 168278b9c502..d9e481872ae6 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -15,8 +15,9 @@ from prefect import flow, task from prefect.client.schemas import TaskRun -from prefect.states import Completed, Running -from prefect.task_engine import AsyncTaskRunEngine, digest_task_inputs +from prefect.context import FlowRunContext +from prefect.states import Running +from prefect.task_engine import AsyncTaskRunEngine, run_task_async from prefect.telemetry.bootstrap import setup_telemetry from prefect.telemetry.instrumentation import extract_account_and_workspace_id from prefect.telemetry.logging import get_log_handler @@ -168,147 +169,180 @@ def test_logger_provider( class TestTaskRunInstrumentation: - @task - def test_task(x: int, y: int): - return x + y - - @task - def returns_4(): - return 4 - - def test_digest_task_inputs(self): - inputs = {"x": 1, "y": 2} - parameters = {"x": int, "y": int} - otel_params, otel_inputs = digest_task_inputs(inputs, parameters) - assert otel_params == { - "prefect.run.parameter.x": "int", - "prefect.run.parameter.y": "int", - } - assert otel_inputs == [] - - -# @pytest.fixture -# def mock_tracer(): -# trace_provider, _, _ = setup_telemetry() -# span_exporter = InMemorySpanExporter() -# span_processor = InFlightSpanProcessor(span_exporter) -# trace_provider.add_span_processor(span_processor) -# trace.set_tracer_provider(trace_provider) -# return trace.get_tracer("prefect.test") - - -@pytest.fixture -async def task_run_engine(instrumentation): - @task - async def test_task(x: int, y: int): - return x + y - - task_run = TaskRun( - id=uuid4(), - task_key="test_task", - flow_run_id=uuid4(), - state=Running(), - dynamic_key="test_task-1", - ) - - engine = AsyncTaskRunEngine( - task=test_task, - task_run=task_run, - parameters={"x": 1, "y": 2}, - _tracer=instrumentation.tracer_provider.get_tracer("prefect.test"), - ) - return engine - - -@pytest.mark.asyncio -async def test_span_creation(task_run_engine, instrumentation): - async with task_run_engine.start(): - assert task_run_engine._span is not None - assert task_run_engine._span.name == task_run_engine.task_run.name - assert task_run_engine._span.attributes["prefect.run.type"] == "task" - assert task_run_engine._span.attributes["prefect.run.id"] == str( - task_run_engine.task_run.id + @pytest.fixture + async def task_run_engine(instrumentation): + @task + async def test_task(x: int, y: int): + return x + y + + task_run = TaskRun( + id=uuid4(), + task_key="test_task", + flow_run_id=uuid4(), + state=Running(), + dynamic_key="test_task-1", ) + engine = AsyncTaskRunEngine( + task=test_task, + task_run=task_run, + parameters={"x": 1, "y": 2}, + _tracer=instrumentation.tracer_provider.get_tracer("prefect.test"), + ) + return engine -@pytest.mark.asyncio -async def test_span_attributes(task_run_engine): - async with task_run_engine.start(): - assert "prefect.run.parameter.x" in task_run_engine._span.attributes - assert "prefect.run.parameter.y" in task_run_engine._span.attributes - assert task_run_engine._span.attributes["prefect.run.parameter.x"] == "int" - assert task_run_engine._span.attributes["prefect.run.parameter.y"] == "int" - - -@pytest.mark.asyncio -async def test_span_events(task_run_engine): - async with task_run_engine.start(): - await task_run_engine.set_state(Running()) - await task_run_engine.set_state(Completed()) - - events = task_run_engine._span.events - assert len(events) == 2 - assert events[0].name == "Running" - assert events[1].name == "Completed" - - -@pytest.mark.asyncio -async def test_span_status_on_success(task_run_engine): - async with task_run_engine.start(): - async with task_run_engine.run_context(): - await task_run_engine.handle_success(3, Mock()) - - assert task_run_engine._span.status.status_code == trace.StatusCode.OK - - -@pytest.mark.asyncio -async def test_span_status_on_failure(task_run_engine): - async with task_run_engine.start(): - async with task_run_engine.run_context(): - await task_run_engine.handle_exception(ValueError("Test error")) - - assert task_run_engine._span.status.status_code == trace.StatusCode.ERROR - assert "Test error" in task_run_engine._span.status.description - - -@pytest.mark.asyncio -async def test_span_exception_recording(task_run_engine): - test_exception = ValueError("Test error") - async with task_run_engine.start(): - async with task_run_engine.run_context(): - await task_run_engine.handle_exception(test_exception) - - events = task_run_engine._span.events - assert any(event.name == "exception" for event in events) - exception_event = next(event for event in events if event.name == "exception") - assert exception_event.attributes["exception.type"] == "ValueError" - assert exception_event.attributes["exception.message"] == "Test error" - + @pytest.mark.asyncio + async def test_span_creation(task_run_engine, instrumentation): + @task + async def test_task(x: int, y: int): + return x + y -@pytest.mark.asyncio -async def test_span_links(task_run_engine): - # Simulate a parent task run - parent_task_run_id = uuid4() - task_run_engine.task_run.task_inputs = {"x": [{"id": parent_task_run_id}], "y": [2]} + task_run_id = uuid4() + await run_task_async( + test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + ) - async with task_run_engine.start(): - pass + spans = instrumentation.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "test_task" + assert spans[0].attributes["prefect.run.id"] == str(task_run_id) - assert len(task_run_engine._span.links) == 1 - link = task_run_engine._span.links[0] - assert link.context.trace_id == int(parent_task_run_id) - assert link.attributes["prefect.run.id"] == str(parent_task_run_id) + @pytest.mark.asyncio + async def test_span_attributes(task_run_engine, instrumentation): + @task + async def test_task(x: int, y: int): + return x + y + task_run_id = uuid4() + await run_task_async( + test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + ) -@pytest.mark.asyncio -async def test_flow_run_labels(task_run_engine): - @flow - async def test_flow(): - return await task_run_engine.task() + spans = instrumentation.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "test_task" + assert spans[0].attributes["prefect.run.id"] == str(task_run_id) + assert "prefect.run.parameter.x" in spans[0].attributes + assert "prefect.run.parameter.y" in spans[0].attributes + assert spans[0].attributes["prefect.run.parameter.x"] == "int" + assert spans[0].attributes["prefect.run.parameter.y"] == "int" + + @pytest.mark.asyncio + async def test_span_events(task_run_engine, instrumentation): + @task + async def test_task(x: int, y: int): + return x + y + + task_run_id = uuid4() + await run_task_async( + test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + ) - with patch("prefect.context.FlowRunContext.get") as mock_flow_run_context: - mock_flow_run_context.return_value.flow_run.labels = {"env": "test"} - async with task_run_engine.start(): - pass + spans = instrumentation.get_finished_spans() + events = spans[0].events + assert len(events) == 2 + assert events[0].name == "Running" + assert events[1].name == "Completed" + + @pytest.mark.asyncio + async def test_span_status_on_success(task_run_engine, instrumentation): + @task + async def test_task(x: int, y: int): + return x + y + + task_run_id = uuid4() + await run_task_async( + test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + ) - assert task_run_engine._span.attributes["env"] == "test" + spans = instrumentation.get_finished_spans() + assert len(spans) == 1 + + assert spans[0].status.status_code == trace.StatusCode.OK + + @pytest.mark.asyncio + async def test_span_status_on_failure(task_run_engine, instrumentation): + @task + async def test_task(x: int, y: int): + raise ValueError("Test error") + + task_run_id = uuid4() + with pytest.raises(ValueError, match="Test error"): + await run_task_async( + test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + ) + + spans = instrumentation.get_finished_spans() + assert len(spans) == 1 + + assert spans[0].status.status_code == trace.StatusCode.ERROR + assert "Test error" in spans[0].status.description + + @pytest.mark.asyncio + async def test_span_exception_recording(task_run_engine, instrumentation): + @task + async def test_task(x: int, y: int): + raise ValueError("Test error") + + task_run_id = uuid4() + + with pytest.raises(ValueError, match="Test error"): + await run_task_async( + test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + ) + + spans = instrumentation.get_finished_spans() + assert len(spans) == 1 + + events = spans[0].events + + assert any(event.name == "exception" for event in events) + exception_event = next(event for event in events if event.name == "exception") + assert exception_event.attributes["exception.type"] == "ValueError" + assert exception_event.attributes["exception.message"] == "Test error" + + @pytest.mark.asyncio + async def test_flow_run_labels(instrumentation, prefect_client): + """Test that flow run labels are propagated to task spans""" + + @task + async def child_task(): + return 1 + + @flow + async def parent_flow(): + return await child_task() + + # Create a patch that only modifies the labels of the flow run + with patch("prefect.context.FlowRunContext.get") as mock_get: + flow_run = await prefect_client.create_flow_run( + parent_flow, name="test-flow-run" + ) + + # Get the actual flow run context + real_context = FlowRunContext.get() + # Create a new mock that copies the real context but adds labels + mock_context = Mock( + wraps=real_context, + ) + # Add labels to the flow run + mock_context.flow_run.labels = {"env": "test", "team": "engineering"} + mock_context.flow_run.id = flow_run.id + mock_get.return_value = mock_context + + # Run the flow which will execute our task + await parent_flow() + + # Get the spans from our mock tracer + spans = instrumentation.get_finished_spans() + + # Find the task span - there should be exactly one task span + task_spans = [ + span for span in spans if span.attributes.get("prefect.run.type") == "task" + ] + assert len(task_spans) == 1 + task_span = task_spans[0] + + # Verify the flow labels were propagated to the task span + assert task_span.attributes["env"] == "test" + assert task_span.attributes["team"] == "engineering" From 781d0f4e32ef6efb3f9805fd713587b777b1d5b1 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Thu, 14 Nov 2024 10:53:42 -0600 Subject: [PATCH 08/19] labels function --- src/prefect/task_engine.py | 55 +++++-------------------- tests/telemetry/test_instrumentation.py | 22 ++-------- 2 files changed, 14 insertions(+), 63 deletions(-) diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 859f35b27f3a..9b75b125ebee 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -21,7 +21,6 @@ Optional, Sequence, Set, - Tuple, Type, TypeVar, Union, @@ -32,8 +31,6 @@ import pendulum from opentelemetry import trace from opentelemetry.trace import ( - Link, - SpanContext, Status, StatusCode, Tracer, @@ -110,28 +107,10 @@ BACKOFF_MAX = 10 -def digest_task_inputs(inputs, parameters) -> Tuple[Dict[str, str], list[Link]]: - parameter_attributes = {} - links = [] - for key, value in inputs.items(): - for input in value: - if key in ("__parents__", "wait_for"): - continue - if isinstance(input, TaskRunInput): - parameter_attributes[f"prefect.run.parameter.{key}"] = type( - parameters[key] - ).__name__ - links.append( - Link( - SpanContext(trace_id=int(input.id), span_id=0, is_remote=True), - attributes={"prefect.run.id": str(input.id)}, - ) - ) - else: - parameter_attributes[f"prefect.run.parameter.{key}"] = type( - input - ).__name__ - return parameter_attributes, links +def get_labels_from_context(context: Optional[FlowRunContext]) -> Dict[str, Any]: + if context is None: + return {} + return context.flow_run.labels class TaskRunTimeoutError(TimeoutError): @@ -744,21 +723,11 @@ def initialize_run( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) - context = None - parameter_attributes, links = digest_task_inputs( - self.task_run.task_inputs, self.parameters - ) - - if not parent_task_run_context and not flow_run_context: - context = SpanContext( - trace_id=int(self.task_run.id), - span_id=0, - is_remote=False, - ) - labels = {} - if flow_run_context: - labels = flow_run_context.flow_run.labels - + labels = get_labels_from_context(flow_run_context) + parameter_attributes = { + f"prefect.run.parameter.{k}": type(v).__name__ + for k, v in self.parameters.items() + } self._span = self._tracer.start_span( name=self.task_run.name, attributes={ @@ -768,8 +737,6 @@ def initialize_run( **parameter_attributes, **labels, }, - links=links, - context=context, ) yield self @@ -1309,9 +1276,7 @@ async def initialize_run( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) - labels = {} - if flow_run_context: - labels = flow_run_context.flow_run.labels + labels = get_labels_from_context(flow_run_context) parameter_attributes = { f"prefect.run.parameter.{k}": type(v).__name__ diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index d9e481872ae6..ff11097e1f5e 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -1,5 +1,5 @@ import os -from unittest.mock import Mock, patch +from unittest.mock import patch from uuid import UUID, uuid4 import pytest @@ -15,7 +15,6 @@ from prefect import flow, task from prefect.client.schemas import TaskRun -from prefect.context import FlowRunContext from prefect.states import Running from prefect.task_engine import AsyncTaskRunEngine, run_task_async from prefect.telemetry.bootstrap import setup_telemetry @@ -302,7 +301,7 @@ async def test_task(x: int, y: int): assert exception_event.attributes["exception.message"] == "Test error" @pytest.mark.asyncio - async def test_flow_run_labels(instrumentation, prefect_client): + async def test_flow_run_labels(task_run_engine, instrumentation): """Test that flow run labels are propagated to task spans""" @task @@ -314,21 +313,8 @@ async def parent_flow(): return await child_task() # Create a patch that only modifies the labels of the flow run - with patch("prefect.context.FlowRunContext.get") as mock_get: - flow_run = await prefect_client.create_flow_run( - parent_flow, name="test-flow-run" - ) - - # Get the actual flow run context - real_context = FlowRunContext.get() - # Create a new mock that copies the real context but adds labels - mock_context = Mock( - wraps=real_context, - ) - # Add labels to the flow run - mock_context.flow_run.labels = {"env": "test", "team": "engineering"} - mock_context.flow_run.id = flow_run.id - mock_get.return_value = mock_context + with patch("prefect.task_engine.get_labels_from_context") as mock_get: + mock_get.return_value = {"env": "test", "team": "engineering"} # Run the flow which will execute our task await parent_flow() From f95d35a8496bb0f76f88199ff823b7b7c27cfc7c Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Thu, 14 Nov 2024 11:33:00 -0600 Subject: [PATCH 09/19] keyvalue labels class --- src/prefect/client/schemas/objects.py | 6 ++++++ tests/telemetry/test_instrumentation.py | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/prefect/client/schemas/objects.py b/src/prefect/client/schemas/objects.py index b26f2e9bff0a..62d53306ddda 100644 --- a/src/prefect/client/schemas/objects.py +++ b/src/prefect/client/schemas/objects.py @@ -23,6 +23,9 @@ HttpUrl, IPvAnyNetwork, SerializationInfo, + StrictBool, + StrictFloat, + StrictInt, Tag, field_validator, model_serializer, @@ -501,6 +504,9 @@ def populate_deprecated_fields(cls, values: Any): return values +KeyValueLabels = Dict[str, Union[StrictBool, StrictInt, StrictFloat, str]] + + class FlowRun(ObjectBaseModel): name: str = Field( default_factory=lambda: generate_slug(2), diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index ff11097e1f5e..30e78d1afce2 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -281,11 +281,11 @@ async def test_task(x: int, y: int): async def test_span_exception_recording(task_run_engine, instrumentation): @task async def test_task(x: int, y: int): - raise ValueError("Test error") + raise Exception("Test error") task_run_id = uuid4() - with pytest.raises(ValueError, match="Test error"): + with pytest.raises(Exception, match="Test error"): await run_task_async( test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} ) @@ -297,7 +297,7 @@ async def test_task(x: int, y: int): assert any(event.name == "exception" for event in events) exception_event = next(event for event in events if event.name == "exception") - assert exception_event.attributes["exception.type"] == "ValueError" + assert exception_event.attributes["exception.type"] == "Exception" assert exception_event.attributes["exception.message"] == "Test error" @pytest.mark.asyncio From 6c83dfcb4b4ed5dedf27e001a7ae54c0a8c66f41 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Thu, 14 Nov 2024 14:52:21 -0600 Subject: [PATCH 10/19] test refractor --- src/prefect/telemetry/test_util.py | 31 +++++++++++++- tests/telemetry/test_instrumentation.py | 55 ++++++++++++++----------- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/src/prefect/telemetry/test_util.py b/src/prefect/telemetry/test_util.py index f62a839af176..931f2083d15f 100644 --- a/src/prefect/telemetry/test_util.py +++ b/src/prefect/telemetry/test_util.py @@ -1,10 +1,10 @@ -from typing import Tuple +from typing import Any, Dict, Protocol, Tuple, Union from opentelemetry import metrics as metrics_api from opentelemetry import trace as trace_api from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace import ReadableSpan, Span, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( InMemorySpanExporter, @@ -13,6 +13,7 @@ reset_metrics_globals, reset_trace_globals, ) +from opentelemetry.util.types import Attributes def create_tracer_provider(**kwargs) -> Tuple[TracerProvider, InMemorySpanExporter]: @@ -51,6 +52,19 @@ def create_meter_provider(**kwargs) -> Tuple[MeterProvider, InMemoryMetricReader return meter_provider, memory_reader +class HasAttributesViaProperty(Protocol): + @property + def attributes(self) -> Attributes: + ... + + +class HasAttributesViaAttr(Protocol): + attributes: Attributes + + +HasAttributes = Union[HasAttributesViaProperty, HasAttributesViaAttr] + + class InstrumentationTester: tracer_provider: TracerProvider memory_exporter: InMemorySpanExporter @@ -78,3 +92,16 @@ def reset(self): def get_finished_spans(self): return self.memory_exporter.get_finished_spans() + + @staticmethod + def assert_has_attributes(obj: HasAttributes, attributes: Dict[str, Any]): + assert obj.attributes is not None + for key, val in attributes.items(): + assert key in obj.attributes + assert obj.attributes[key] == val + + @staticmethod + def assert_span_instrumented_for(span: Union[Span, ReadableSpan], module): + assert span.instrumentation_scope is not None + assert span.instrumentation_scope.name == module.__name__ + assert span.instrumentation_scope.version == module.__version__a diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index 30e78d1afce2..d544b47212f5 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -168,8 +168,15 @@ def test_logger_provider( class TestTaskRunInstrumentation: + def assert_has_attributes(self, span, **expected_attributes): + """Helper method to assert span has expected attributes""" + for key, value in expected_attributes.items(): + assert ( + span.attributes.get(key) == value + ), f"Expected {key}={value}, got {span.attributes.get(key)}" + @pytest.fixture - async def task_run_engine(instrumentation): + async def task_run_engine(self, instrumentation): @task async def test_task(x: int, y: int): return x + y @@ -191,7 +198,7 @@ async def test_task(x: int, y: int): return engine @pytest.mark.asyncio - async def test_span_creation(task_run_engine, instrumentation): + async def test_span_creation(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): return x + y @@ -203,11 +210,13 @@ async def test_task(x: int, y: int): spans = instrumentation.get_finished_spans() assert len(spans) == 1 + self.assert_has_attributes( + spans[0], **{"prefect.run.id": str(task_run_id), "prefect.run.type": "task"} + ) assert spans[0].name == "test_task" - assert spans[0].attributes["prefect.run.id"] == str(task_run_id) @pytest.mark.asyncio - async def test_span_attributes(task_run_engine, instrumentation): + async def test_span_attributes(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): return x + y @@ -219,15 +228,19 @@ async def test_task(x: int, y: int): spans = instrumentation.get_finished_spans() assert len(spans) == 1 + self.assert_has_attributes( + spans[0], + **{ + "prefect.run.id": str(task_run_id), + "prefect.run.type": "task", + "prefect.run.parameter.x": "int", + "prefect.run.parameter.y": "int", + }, + ) assert spans[0].name == "test_task" - assert spans[0].attributes["prefect.run.id"] == str(task_run_id) - assert "prefect.run.parameter.x" in spans[0].attributes - assert "prefect.run.parameter.y" in spans[0].attributes - assert spans[0].attributes["prefect.run.parameter.x"] == "int" - assert spans[0].attributes["prefect.run.parameter.y"] == "int" @pytest.mark.asyncio - async def test_span_events(task_run_engine, instrumentation): + async def test_span_events(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): return x + y @@ -244,7 +257,7 @@ async def test_task(x: int, y: int): assert events[1].name == "Completed" @pytest.mark.asyncio - async def test_span_status_on_success(task_run_engine, instrumentation): + async def test_span_status_on_success(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): return x + y @@ -260,7 +273,7 @@ async def test_task(x: int, y: int): assert spans[0].status.status_code == trace.StatusCode.OK @pytest.mark.asyncio - async def test_span_status_on_failure(task_run_engine, instrumentation): + async def test_span_status_on_failure(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): raise ValueError("Test error") @@ -278,7 +291,7 @@ async def test_task(x: int, y: int): assert "Test error" in spans[0].status.description @pytest.mark.asyncio - async def test_span_exception_recording(task_run_engine, instrumentation): + async def test_span_exception_recording(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): raise Exception("Test error") @@ -301,7 +314,7 @@ async def test_task(x: int, y: int): assert exception_event.attributes["exception.message"] == "Test error" @pytest.mark.asyncio - async def test_flow_run_labels(task_run_engine, instrumentation): + async def test_flow_run_labels(self, task_run_engine, instrumentation): """Test that flow run labels are propagated to task spans""" @task @@ -312,23 +325,17 @@ async def child_task(): async def parent_flow(): return await child_task() - # Create a patch that only modifies the labels of the flow run with patch("prefect.task_engine.get_labels_from_context") as mock_get: mock_get.return_value = {"env": "test", "team": "engineering"} - - # Run the flow which will execute our task await parent_flow() - # Get the spans from our mock tracer spans = instrumentation.get_finished_spans() - - # Find the task span - there should be exactly one task span task_spans = [ span for span in spans if span.attributes.get("prefect.run.type") == "task" ] assert len(task_spans) == 1 - task_span = task_spans[0] - # Verify the flow labels were propagated to the task span - assert task_span.attributes["env"] == "test" - assert task_span.attributes["team"] == "engineering" + self.assert_has_attributes( + task_spans[0], + **{"prefect.run.type": "task", "env": "test", "team": "engineering"}, + ) From c63a0aa73662190f57987010676a7fb43ac46e83 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Tue, 19 Nov 2024 15:22:38 -0600 Subject: [PATCH 11/19] Refactor telemetry to its own class --- src/prefect/task_engine.py | 119 ++++++----------------- src/prefect/telemetry/instrumentation.py | 71 +++++++++++++- tests/telemetry/test_instrumentation.py | 16 ++- 3 files changed, 106 insertions(+), 100 deletions(-) diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 9b75b125ebee..8302cce5a6b7 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -30,15 +30,8 @@ import anyio import pendulum from opentelemetry import trace -from opentelemetry.trace import ( - Status, - StatusCode, - Tracer, - get_tracer, -) from typing_extensions import ParamSpec -import prefect from prefect import Task from prefect.client.orchestration import PrefectClient, SyncPrefectClient, get_client from prefect.client.schemas import TaskRun @@ -87,6 +80,7 @@ exception_to_failed_state, return_value_to_state, ) +from prefect.telemetry.instrumentation import RunTelemetry from prefect.transactions import IsolationLevel, Transaction, transaction from prefect.utilities.annotations import NotSet from prefect.utilities.asyncutils import run_coro_as_sync @@ -134,9 +128,7 @@ class BaseTaskRunEngine(Generic[P, R]): _is_started: bool = False _task_name_set: bool = False _last_event: Optional[PrefectEvent] = None - _tracer: Tracer = field( - default_factory=lambda: get_tracer("prefect", prefect.__version__) - ) + _telemetry: RunTelemetry = field(default_factory=RunTelemetry) def __post_init__(self): if self.parameters is None: @@ -477,15 +469,7 @@ def set_state(self, state: State, force: bool = False) -> State: validated_state=self.task_run.state, follows=self._last_event, ) - self._span.add_event( - new_state.name, - { - "prefect.state.message": new_state.message or "", - "prefect.state.type": new_state.type, - "prefect.state.name": new_state.name or new_state.type, - "prefect.state.id": str(new_state.id), - }, - ) + self._telemetry.start_span(self.task_run.name, self.task_run, self.parameters) return new_state def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -540,10 +524,7 @@ def handle_success(self, result: R, transaction: Transaction) -> R: self.set_state(terminal_state) self._return_value = result - self._span.set_status(Status(StatusCode.OK), terminal_state.message) - self._span.end(time.time_ns()) - self._span = None - + self._telemetry.end_span_on_success(terminal_state.message) return result def handle_retry(self, exc: Exception) -> bool: @@ -592,7 +573,7 @@ def handle_retry(self, exc: Exception) -> bool: def handle_exception(self, exc: Exception) -> None: # If the task fails, and we have retries left, set the task to retrying. - self._span.record_exception(exc) + self._telemetry.record_exception(exc) if not self.handle_retry(exc): # If the task has no retries left, or the retry condition is not met, set the task to failed. state = run_coro_as_sync( @@ -607,9 +588,7 @@ def handle_exception(self, exc: Exception) -> None: self.set_state(state) self._raised = exc - self._span.set_status(Status(StatusCode.ERROR, state.message)) - self._span.end(time.time_ns()) - self._span = None + self._telemetry.end_span_on_failure(state.message) def handle_timeout(self, exc: TimeoutError) -> None: if not self.handle_retry(exc): @@ -633,11 +612,8 @@ def handle_crash(self, exc: BaseException) -> None: self.record_terminal_state_timing(state) self.set_state(state, force=True) self._raised = exc - - self._span.record_exception(exc) - self._span.set_status(Status(StatusCode.ERROR, state.message)) - self._span.end(time.time_ns()) - self._span = None + self._telemetry.record_exception(exc) + self._telemetry.end_span_on_failure(state.message) @contextmanager def setup_run_context(self, client: Optional[SyncPrefectClient] = None): @@ -722,21 +698,11 @@ def initialize_run( self.logger.debug( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) - - labels = get_labels_from_context(flow_run_context) - parameter_attributes = { - f"prefect.run.parameter.{k}": type(v).__name__ - for k, v in self.parameters.items() - } - self._span = self._tracer.start_span( - name=self.task_run.name, - attributes={ - "prefect.run.type": "task", - "prefect.run.id": str(self.task_run.id), - "prefect.tags": self.task_run.tags, - **parameter_attributes, - **labels, - }, + labels = {} + if flow_run_context: + labels = flow_run_context.flow_run.labels + self._telemetry.start_span( + self.task_run, self.parameters, labels ) yield self @@ -790,7 +756,7 @@ def start( dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None, ) -> Generator[None, None, None]: with self.initialize_run(task_run_id=task_run_id, dependencies=dependencies): - with trace.use_span(self._span): + with trace.use_span(self._telemetry._span): self.begin_run() try: yield @@ -1038,16 +1004,7 @@ async def set_state(self, state: State, force: bool = False) -> State: follows=self._last_event, ) - self._span.add_event( - new_state.name, - { - "prefect.state.message": new_state.message or "", - "prefect.state.type": new_state.type, - "prefect.state.name": new_state.name or new_state.type, - "prefect.state.id": str(new_state.id), - }, - ) - + self._telemetry.update_state(new_state) return new_state async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -1097,9 +1054,7 @@ async def handle_success(self, result: R, transaction: Transaction) -> R: await self.set_state(terminal_state) self._return_value = result - self._span.set_status(Status(StatusCode.OK, terminal_state.message)) - self._span.end(time.time_ns()) - self._span = None + self._telemetry.end_span_on_success(terminal_state.message) return result @@ -1149,7 +1104,7 @@ async def handle_retry(self, exc: Exception) -> bool: async def handle_exception(self, exc: Exception) -> None: # If the task fails, and we have retries left, set the task to retrying. - self._span.record_exception(exc) + self._telemetry.record_exception(exc) if not await self.handle_retry(exc): # If the task has no retries left, or the retry condition is not met, set the task to failed. state = await exception_to_failed_state( @@ -1160,12 +1115,11 @@ async def handle_exception(self, exc: Exception) -> None: self.record_terminal_state_timing(state) await self.set_state(state) self._raised = exc - self._span.set_status(Status(StatusCode.ERROR, state.message)) - self._span.end(time.time_ns()) - self._span = None + + self._telemetry.end_span_on_failure(state.message) async def handle_timeout(self, exc: TimeoutError) -> None: - self._span.record_exception(exc) + self._telemetry.record_exception(exc) if not await self.handle_retry(exc): if isinstance(exc, TaskRunTimeoutError): message = f"Task run exceeded timeout of {self.task.timeout_seconds} second(s)" @@ -1179,9 +1133,7 @@ async def handle_timeout(self, exc: TimeoutError) -> None: ) await self.set_state(state) self._raised = exc - self._span.set_status(Status(StatusCode.ERROR, state.message)) - self._span.end(time.time_ns()) - self._span = None + self._telemetry.end_span_on_failure(state.message) async def handle_crash(self, exc: BaseException) -> None: state = await exception_to_crashed_state(exc) @@ -1191,10 +1143,8 @@ async def handle_crash(self, exc: BaseException) -> None: await self.set_state(state, force=True) self._raised = exc - self._span.record_exception(exc) - self._span.set_status(Status(StatusCode.ERROR, state.message)) - self._span.end(time.time_ns()) - self._span = None + self._telemetry.record_exception(exc) + self._telemetry.end_span_on_failure(state.message) @asynccontextmanager async def setup_run_context(self, client: Optional[PrefectClient] = None): @@ -1275,22 +1225,11 @@ async def initialize_run( self.logger.debug( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) - - labels = get_labels_from_context(flow_run_context) - - parameter_attributes = { - f"prefect.run.parameter.{k}": type(v).__name__ - for k, v in self.parameters.items() - } - self._span = self._tracer.start_span( - name=self.task_run.name, - attributes={ - "prefect.run.type": "task", - "prefect.run.id": str(self.task_run.id), - "prefect.tags": self.task_run.tags, - **parameter_attributes, - **labels, - }, + labels = {} + if flow_run_context: + labels = get_labels_from_context(flow_run_context) + self._telemetry.start_span( + self.task_run, self.parameters, labels ) yield self @@ -1346,7 +1285,7 @@ async def start( async with self.initialize_run( task_run_id=task_run_id, dependencies=dependencies ): - with trace.use_span(self._span): + with trace.use_span(self._telemetry._span): await self.begin_run() try: yield diff --git a/src/prefect/telemetry/instrumentation.py b/src/prefect/telemetry/instrumentation.py index bb1ddbfcb425..c907729c5a7d 100644 --- a/src/prefect/telemetry/instrumentation.py +++ b/src/prefect/telemetry/instrumentation.py @@ -1,7 +1,9 @@ import logging import os import re -from typing import TYPE_CHECKING +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Dict from urllib.parse import urljoin from uuid import UUID @@ -16,6 +18,16 @@ from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace import ( + Status, + StatusCode, + Tracer, + get_tracer, +) + +import prefect +from prefect.client.schemas import TaskRun +from prefect.client.schemas.objects import State from .logging import set_log_handler from .processors import InFlightSpanProcessor @@ -123,3 +135,60 @@ def _setup_logger_provider( set_log_handler(log_handler) return logger_provider + + +@dataclass +class RunTelemetry: + _tracer: Tracer = field( + default_factory=lambda: get_tracer("prefect", prefect.__version__) + ) + _span = None + + def start_span( + self, + task_run: TaskRun, + parameters: Dict[str, Any] = {}, + labels: Dict[str, Any] = {}, + ): + parameter_attributes = { + f"prefect.run.parameter.{k}": type(v).__name__ + for k, v in parameters.items() + } + self._span = self._tracer.start_span( + name=task_run.name, + attributes={ + "prefect.run.type": "task", + "prefect.run.id": str(task_run.id), + "prefect.tags": task_run.tags, + **parameter_attributes, + **labels, + }, + ) + + def end_span_on_success(self, terminal_message: str): + if self._span: + self._span.set_status(Status(StatusCode.OK), terminal_message) + self._span.end(time.time_ns()) + self._span = None + + def end_span_on_failure(self, terminal_message: str): + if self._span: + self._span.set_status(Status(StatusCode.ERROR, terminal_message)) + self._span.end(time.time_ns()) + self._span = None + + def record_exception(self, exc: Exception): + if self._span: + self._span.record_exception(exc) + + def update_state(self, new_state: State): + if self._span: + self._span.add_event( + new_state.name, + { + "prefect.state.message": new_state.message or "", + "prefect.state.type": new_state.type, + "prefect.state.name": new_state.name or new_state.type, + "prefect.state.id": str(new_state.id), + }, + ) diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index d544b47212f5..dbf972af1985 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -18,7 +18,10 @@ from prefect.states import Running from prefect.task_engine import AsyncTaskRunEngine, run_task_async from prefect.telemetry.bootstrap import setup_telemetry -from prefect.telemetry.instrumentation import extract_account_and_workspace_id +from prefect.telemetry.instrumentation import ( + RunTelemetry, + extract_account_and_workspace_id, +) from prefect.telemetry.logging import get_log_handler from prefect.telemetry.processors import InFlightSpanProcessor @@ -193,11 +196,12 @@ async def test_task(x: int, y: int): task=test_task, task_run=task_run, parameters={"x": 1, "y": 2}, - _tracer=instrumentation.tracer_provider.get_tracer("prefect.test"), + _telemetry=RunTelemetry( + _tracer=instrumentation.tracer_provider.get_tracer("prefect.test") + ), ) return engine - @pytest.mark.asyncio async def test_span_creation(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): @@ -215,7 +219,6 @@ async def test_task(x: int, y: int): ) assert spans[0].name == "test_task" - @pytest.mark.asyncio async def test_span_attributes(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): @@ -239,7 +242,6 @@ async def test_task(x: int, y: int): ) assert spans[0].name == "test_task" - @pytest.mark.asyncio async def test_span_events(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): @@ -256,7 +258,6 @@ async def test_task(x: int, y: int): assert events[0].name == "Running" assert events[1].name == "Completed" - @pytest.mark.asyncio async def test_span_status_on_success(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): @@ -272,7 +273,6 @@ async def test_task(x: int, y: int): assert spans[0].status.status_code == trace.StatusCode.OK - @pytest.mark.asyncio async def test_span_status_on_failure(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): @@ -290,7 +290,6 @@ async def test_task(x: int, y: int): assert spans[0].status.status_code == trace.StatusCode.ERROR assert "Test error" in spans[0].status.description - @pytest.mark.asyncio async def test_span_exception_recording(self, task_run_engine, instrumentation): @task async def test_task(x: int, y: int): @@ -313,7 +312,6 @@ async def test_task(x: int, y: int): assert exception_event.attributes["exception.type"] == "Exception" assert exception_event.attributes["exception.message"] == "Test error" - @pytest.mark.asyncio async def test_flow_run_labels(self, task_run_engine, instrumentation): """Test that flow run labels are propagated to task spans""" From c2afe1f2304b98fe5643b15df06ee1c808b34dc0 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Tue, 19 Nov 2024 16:10:57 -0600 Subject: [PATCH 12/19] Unit test sync and async engines --- src/prefect/task_engine.py | 4 +- tests/telemetry/test_instrumentation.py | 186 +++++++++++++++--------- 2 files changed, 116 insertions(+), 74 deletions(-) diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 8302cce5a6b7..ba7bf661ef3a 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -469,7 +469,7 @@ def set_state(self, state: State, force: bool = False) -> State: validated_state=self.task_run.state, follows=self._last_event, ) - self._telemetry.start_span(self.task_run.name, self.task_run, self.parameters) + self._telemetry.update_state(new_state) return new_state def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -700,7 +700,7 @@ def initialize_run( ) labels = {} if flow_run_context: - labels = flow_run_context.flow_run.labels + labels = get_labels_from_context(flow_run_context) self._telemetry.start_span( self.task_run, self.parameters, labels ) diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index dbf972af1985..129ad1efc226 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -14,12 +14,12 @@ from opentelemetry.sdk.trace import TracerProvider from prefect import flow, task -from prefect.client.schemas import TaskRun -from prefect.states import Running -from prefect.task_engine import AsyncTaskRunEngine, run_task_async +from prefect.task_engine import ( + run_task_async, + run_task_sync, +) from prefect.telemetry.bootstrap import setup_telemetry from prefect.telemetry.instrumentation import ( - RunTelemetry, extract_account_and_workspace_id, ) from prefect.telemetry.logging import get_log_handler @@ -171,85 +171,93 @@ def test_logger_provider( class TestTaskRunInstrumentation: - def assert_has_attributes(self, span, **expected_attributes): - """Helper method to assert span has expected attributes""" - for key, value in expected_attributes.items(): - assert ( - span.attributes.get(key) == value - ), f"Expected {key}={value}, got {span.attributes.get(key)}" - - @pytest.fixture - async def task_run_engine(self, instrumentation): + @pytest.fixture(params=["async", "sync"]) + async def engine_type(self, request): + return request.param + + async def run_task(self, task, task_run_id, parameters, engine_type): + if engine_type == "async": + return await run_task_async( + task, task_run_id=task_run_id, parameters=parameters + ) + else: + return run_task_sync(task, task_run_id=task_run_id, parameters=parameters) + + async def test_span_creation(self, engine_type, instrumentation): @task - async def test_task(x: int, y: int): + async def async_task(x: int, y: int): return x + y - task_run = TaskRun( - id=uuid4(), - task_key="test_task", - flow_run_id=uuid4(), - state=Running(), - dynamic_key="test_task-1", - ) - - engine = AsyncTaskRunEngine( - task=test_task, - task_run=task_run, - parameters={"x": 1, "y": 2}, - _telemetry=RunTelemetry( - _tracer=instrumentation.tracer_provider.get_tracer("prefect.test") - ), - ) - return engine - - async def test_span_creation(self, task_run_engine, instrumentation): @task - async def test_task(x: int, y: int): + def sync_task(x: int, y: int): return x + y + task_fn = async_task if engine_type == "async" else sync_task task_run_id = uuid4() - await run_task_async( - test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + + await self.run_task( + task_fn, + task_run_id=task_run_id, + parameters={"x": 1, "y": 2}, + engine_type=engine_type, ) spans = instrumentation.get_finished_spans() assert len(spans) == 1 - self.assert_has_attributes( - spans[0], **{"prefect.run.id": str(task_run_id), "prefect.run.type": "task"} + instrumentation.assert_has_attributes( + spans[0], {"prefect.run.id": str(task_run_id), "prefect.run.type": "task"} ) - assert spans[0].name == "test_task" + assert spans[0].name == task_fn.__name__ - async def test_span_attributes(self, task_run_engine, instrumentation): + async def test_span_attributes(self, engine_type, instrumentation): @task - async def test_task(x: int, y: int): + async def async_task(x: int, y: int): return x + y + @task + def sync_task(x: int, y: int): + return x + y + + task_fn = async_task if engine_type == "async" else sync_task task_run_id = uuid4() - await run_task_async( - test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + + await self.run_task( + task_fn, + task_run_id=task_run_id, + parameters={"x": 1, "y": 2}, + engine_type=engine_type, ) spans = instrumentation.get_finished_spans() assert len(spans) == 1 - self.assert_has_attributes( + instrumentation.assert_has_attributes( spans[0], - **{ + { "prefect.run.id": str(task_run_id), "prefect.run.type": "task", "prefect.run.parameter.x": "int", "prefect.run.parameter.y": "int", }, ) - assert spans[0].name == "test_task" + assert spans[0].name == task_fn.__name__ - async def test_span_events(self, task_run_engine, instrumentation): + async def test_span_events(self, engine_type, instrumentation): @task - async def test_task(x: int, y: int): + async def async_task(x: int, y: int): return x + y + @task + def sync_task(x: int, y: int): + return x + y + + task_fn = async_task if engine_type == "async" else sync_task task_run_id = uuid4() - await run_task_async( - test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + + await self.run_task( + task_fn, + task_run_id=task_run_id, + parameters={"x": 1, "y": 2}, + engine_type=engine_type, ) spans = instrumentation.get_finished_spans() @@ -258,74 +266,108 @@ async def test_task(x: int, y: int): assert events[0].name == "Running" assert events[1].name == "Completed" - async def test_span_status_on_success(self, task_run_engine, instrumentation): + async def test_span_status_on_success(self, engine_type, instrumentation): @task - async def test_task(x: int, y: int): + async def async_task(x: int, y: int): return x + y + @task + def sync_task(x: int, y: int): + return x + y + + task_fn = async_task if engine_type == "async" else sync_task task_run_id = uuid4() - await run_task_async( - test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + + await self.run_task( + task_fn, + task_run_id=task_run_id, + parameters={"x": 1, "y": 2}, + engine_type=engine_type, ) spans = instrumentation.get_finished_spans() assert len(spans) == 1 - assert spans[0].status.status_code == trace.StatusCode.OK - async def test_span_status_on_failure(self, task_run_engine, instrumentation): + async def test_span_status_on_failure(self, engine_type, instrumentation): @task - async def test_task(x: int, y: int): + async def async_task(x: int, y: int): raise ValueError("Test error") + @task + def sync_task(x: int, y: int): + raise ValueError("Test error") + + task_fn = async_task if engine_type == "async" else sync_task task_run_id = uuid4() + with pytest.raises(ValueError, match="Test error"): - await run_task_async( - test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + await self.run_task( + task_fn, + task_run_id=task_run_id, + parameters={"x": 1, "y": 2}, + engine_type=engine_type, ) spans = instrumentation.get_finished_spans() assert len(spans) == 1 - assert spans[0].status.status_code == trace.StatusCode.ERROR assert "Test error" in spans[0].status.description - async def test_span_exception_recording(self, task_run_engine, instrumentation): + async def test_span_exception_recording(self, engine_type, instrumentation): @task - async def test_task(x: int, y: int): + async def async_task(x: int, y: int): raise Exception("Test error") + @task + def sync_task(x: int, y: int): + raise Exception("Test error") + + task_fn = async_task if engine_type == "async" else sync_task task_run_id = uuid4() with pytest.raises(Exception, match="Test error"): - await run_task_async( - test_task, task_run_id=task_run_id, parameters={"x": 1, "y": 2} + await self.run_task( + task_fn, + task_run_id=task_run_id, + parameters={"x": 1, "y": 2}, + engine_type=engine_type, ) spans = instrumentation.get_finished_spans() assert len(spans) == 1 events = spans[0].events - assert any(event.name == "exception" for event in events) exception_event = next(event for event in events if event.name == "exception") assert exception_event.attributes["exception.type"] == "Exception" assert exception_event.attributes["exception.message"] == "Test error" - async def test_flow_run_labels(self, task_run_engine, instrumentation): + async def test_flow_run_labels(self, engine_type, instrumentation): """Test that flow run labels are propagated to task spans""" @task - async def child_task(): + async def async_child_task(): return 1 + @task + def sync_child_task(): + return 1 + + @flow + async def async_parent_flow(): + return await async_child_task() + @flow - async def parent_flow(): - return await child_task() + def sync_parent_flow(): + return sync_child_task() with patch("prefect.task_engine.get_labels_from_context") as mock_get: mock_get.return_value = {"env": "test", "team": "engineering"} - await parent_flow() + if engine_type == "async": + await async_parent_flow() + else: + sync_parent_flow() spans = instrumentation.get_finished_spans() task_spans = [ @@ -333,7 +375,7 @@ async def parent_flow(): ] assert len(task_spans) == 1 - self.assert_has_attributes( + instrumentation.assert_has_attributes( task_spans[0], - **{"prefect.run.type": "task", "env": "test", "team": "engineering"}, + {"prefect.run.type": "task", "env": "test", "team": "engineering"}, ) From ed90df86d6edc58b4d1898aa3b859ee87c198c21 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 22 Nov 2024 13:16:08 -0600 Subject: [PATCH 13/19] labels no longer mocked --- requirements.txt | 1 + src/prefect/task_engine.py | 13 ++++++----- tests/telemetry/test_instrumentation.py | 31 ++++++++++++++----------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/requirements.txt b/requirements.txt index f4cb6f0574f5..159e8d828f2c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ pytz >= 2021.1, < 2025 readchar >= 4.0.0, < 5.0.0 sqlalchemy[asyncio] >= 2.0, < 3.0.0 typer >= 0.12.0, != 0.12.2, < 0.14.0 + diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index ba7bf661ef3a..d923e3bbd81c 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -698,9 +698,9 @@ def initialize_run( self.logger.debug( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) - labels = {} - if flow_run_context: - labels = get_labels_from_context(flow_run_context) + labels = ( + flow_run_context.flow_run.labels if flow_run_context else {} + ) self._telemetry.start_span( self.task_run, self.parameters, labels ) @@ -1225,9 +1225,10 @@ async def initialize_run( self.logger.debug( f"Created task run {self.task_run.name!r} for task {self.task.name!r}" ) - labels = {} - if flow_run_context: - labels = get_labels_from_context(flow_run_context) + + labels = ( + flow_run_context.flow_run.labels if flow_run_context else {} + ) self._telemetry.start_span( self.task_run, self.parameters, labels ) diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py index 129ad1efc226..1ea74ff55872 100644 --- a/tests/telemetry/test_instrumentation.py +++ b/tests/telemetry/test_instrumentation.py @@ -1,5 +1,4 @@ import os -from unittest.mock import patch from uuid import UUID, uuid4 import pytest @@ -12,6 +11,7 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.trace import TracerProvider +from tests.telemetry.instrumentation_tester import InstrumentationTester from prefect import flow, task from prefect.task_engine import ( @@ -183,7 +183,9 @@ async def run_task(self, task, task_run_id, parameters, engine_type): else: return run_task_sync(task, task_run_id=task_run_id, parameters=parameters) - async def test_span_creation(self, engine_type, instrumentation): + async def test_span_creation( + self, engine_type, instrumentation: InstrumentationTester + ): @task async def async_task(x: int, y: int): return x + y @@ -204,8 +206,10 @@ def sync_task(x: int, y: int): spans = instrumentation.get_finished_spans() assert len(spans) == 1 + span = spans[0] + instrumentation.assert_has_attributes( - spans[0], {"prefect.run.id": str(task_run_id), "prefect.run.type": "task"} + span, {"prefect.run.id": str(task_run_id), "prefect.run.type": "task"} ) assert spans[0].name == task_fn.__name__ @@ -343,8 +347,8 @@ def sync_task(x: int, y: int): assert exception_event.attributes["exception.type"] == "Exception" assert exception_event.attributes["exception.message"] == "Test error" - async def test_flow_run_labels(self, engine_type, instrumentation): - """Test that flow run labels are propagated to task spans""" + async def test_flow_labels(self, engine_type, instrumentation, sync_prefect_client): + """Test that parent flow ID gets propagated to task spans""" @task async def async_child_task(): @@ -362,12 +366,10 @@ async def async_parent_flow(): def sync_parent_flow(): return sync_child_task() - with patch("prefect.task_engine.get_labels_from_context") as mock_get: - mock_get.return_value = {"env": "test", "team": "engineering"} - if engine_type == "async": - await async_parent_flow() - else: - sync_parent_flow() + if engine_type == "async": + state = await async_parent_flow(return_state=True) + else: + state = sync_parent_flow(return_state=True) spans = instrumentation.get_finished_spans() task_spans = [ @@ -375,7 +377,10 @@ def sync_parent_flow(): ] assert len(task_spans) == 1 + assert state.state_details.flow_run_id is not None + flow_run = sync_prefect_client.read_flow_run(state.state_details.flow_run_id) + + # Verify the task span has the parent flow's ID instrumentation.assert_has_attributes( - task_spans[0], - {"prefect.run.type": "task", "env": "test", "team": "engineering"}, + task_spans[0], {**flow_run.labels, "prefect.run.type": "task"} ) From ae585e456fa8dc674899953654ce54e8d7078b24 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 22 Nov 2024 13:48:21 -0600 Subject: [PATCH 14/19] run telemetry moved to its own file --- src/prefect/task_engine.py | 2 +- src/prefect/telemetry/instrumentation.py | 71 +----------------------- src/prefect/telemetry/run_telemetry.py | 71 ++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 71 deletions(-) create mode 100644 src/prefect/telemetry/run_telemetry.py diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index d923e3bbd81c..0b32bdb7ef32 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -80,7 +80,7 @@ exception_to_failed_state, return_value_to_state, ) -from prefect.telemetry.instrumentation import RunTelemetry +from prefect.telemetry.run_telemetry import RunTelemetry from prefect.transactions import IsolationLevel, Transaction, transaction from prefect.utilities.annotations import NotSet from prefect.utilities.asyncutils import run_coro_as_sync diff --git a/src/prefect/telemetry/instrumentation.py b/src/prefect/telemetry/instrumentation.py index c907729c5a7d..bb1ddbfcb425 100644 --- a/src/prefect/telemetry/instrumentation.py +++ b/src/prefect/telemetry/instrumentation.py @@ -1,9 +1,7 @@ import logging import os import re -import time -from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, Dict +from typing import TYPE_CHECKING from urllib.parse import urljoin from uuid import UUID @@ -18,16 +16,6 @@ from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.trace import ( - Status, - StatusCode, - Tracer, - get_tracer, -) - -import prefect -from prefect.client.schemas import TaskRun -from prefect.client.schemas.objects import State from .logging import set_log_handler from .processors import InFlightSpanProcessor @@ -135,60 +123,3 @@ def _setup_logger_provider( set_log_handler(log_handler) return logger_provider - - -@dataclass -class RunTelemetry: - _tracer: Tracer = field( - default_factory=lambda: get_tracer("prefect", prefect.__version__) - ) - _span = None - - def start_span( - self, - task_run: TaskRun, - parameters: Dict[str, Any] = {}, - labels: Dict[str, Any] = {}, - ): - parameter_attributes = { - f"prefect.run.parameter.{k}": type(v).__name__ - for k, v in parameters.items() - } - self._span = self._tracer.start_span( - name=task_run.name, - attributes={ - "prefect.run.type": "task", - "prefect.run.id": str(task_run.id), - "prefect.tags": task_run.tags, - **parameter_attributes, - **labels, - }, - ) - - def end_span_on_success(self, terminal_message: str): - if self._span: - self._span.set_status(Status(StatusCode.OK), terminal_message) - self._span.end(time.time_ns()) - self._span = None - - def end_span_on_failure(self, terminal_message: str): - if self._span: - self._span.set_status(Status(StatusCode.ERROR, terminal_message)) - self._span.end(time.time_ns()) - self._span = None - - def record_exception(self, exc: Exception): - if self._span: - self._span.record_exception(exc) - - def update_state(self, new_state: State): - if self._span: - self._span.add_event( - new_state.name, - { - "prefect.state.message": new_state.message or "", - "prefect.state.type": new_state.type, - "prefect.state.name": new_state.name or new_state.type, - "prefect.state.id": str(new_state.id), - }, - ) diff --git a/src/prefect/telemetry/run_telemetry.py b/src/prefect/telemetry/run_telemetry.py new file mode 100644 index 000000000000..3390577e43f9 --- /dev/null +++ b/src/prefect/telemetry/run_telemetry.py @@ -0,0 +1,71 @@ +import time +from dataclasses import dataclass, field +from typing import Any, Dict + +from opentelemetry.sdk.trace import Tracer +from opentelemetry.trace import ( + Status, + StatusCode, + get_tracer, +) + +import prefect +from prefect.client.schemas import TaskRun +from prefect.client.schemas.objects import State + + +@dataclass +class RunTelemetry: + _tracer: Tracer = field( + default_factory=lambda: get_tracer("prefect", prefect.__version__) + ) + _span = None + + def start_span( + self, + task_run: TaskRun, + parameters: Dict[str, Any] = {}, + labels: Dict[str, Any] = {}, + ): + parameter_attributes = { + f"prefect.run.parameter.{k}": type(v).__name__ + for k, v in parameters.items() + } + self._span = self._tracer.start_span( + name=task_run.name, + attributes={ + "prefect.run.type": "task", + "prefect.run.id": str(task_run.id), + "prefect.tags": task_run.tags, + **parameter_attributes, + **labels, + }, + ) + + def end_span_on_success(self, terminal_message: str): + if self._span: + self._span.set_status(Status(StatusCode.OK), terminal_message) + self._span.end(time.time_ns()) + self._span = None + + def end_span_on_failure(self, terminal_message: str): + if self._span: + self._span.set_status(Status(StatusCode.ERROR, terminal_message)) + self._span.end(time.time_ns()) + self._span = None + + def record_exception(self, exc: Exception): + if self._span: + self._span.record_exception(exc) + + def update_state(self, new_state: State): + if self._span: + self._span.add_event( + new_state.name, + { + "prefect.state.message": new_state.message or "", + "prefect.state.type": new_state.type, + "prefect.state.name": new_state.name or new_state.type, + "prefect.state.id": str(new_state.id), + }, + ) From 30ed2bcdad8ab5d22ec1c0339205df98e2ed01cd Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 22 Nov 2024 13:57:52 -0600 Subject: [PATCH 15/19] type checking guard --- src/prefect/telemetry/run_telemetry.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/telemetry/run_telemetry.py b/src/prefect/telemetry/run_telemetry.py index 3390577e43f9..ff0acea3caba 100644 --- a/src/prefect/telemetry/run_telemetry.py +++ b/src/prefect/telemetry/run_telemetry.py @@ -1,8 +1,7 @@ import time from dataclasses import dataclass, field -from typing import Any, Dict +from typing import TYPE_CHECKING, Any, Dict -from opentelemetry.sdk.trace import Tracer from opentelemetry.trace import ( Status, StatusCode, @@ -13,6 +12,9 @@ from prefect.client.schemas import TaskRun from prefect.client.schemas.objects import State +if TYPE_CHECKING: + from opentelemetry.sdk.trace import Tracer + @dataclass class RunTelemetry: From c752485b9000b787684f74ec7515cbfa5c7fabfc Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 22 Nov 2024 14:05:04 -0600 Subject: [PATCH 16/19] type fix --- src/prefect/telemetry/run_telemetry.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/prefect/telemetry/run_telemetry.py b/src/prefect/telemetry/run_telemetry.py index ff0acea3caba..498b7b60da69 100644 --- a/src/prefect/telemetry/run_telemetry.py +++ b/src/prefect/telemetry/run_telemetry.py @@ -2,6 +2,7 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Dict +from opentelemetry.sdk.trace import Tracer from opentelemetry.trace import ( Status, StatusCode, @@ -18,7 +19,7 @@ @dataclass class RunTelemetry: - _tracer: Tracer = field( + _tracer: "Tracer" = field( default_factory=lambda: get_tracer("prefect", prefect.__version__) ) _span = None From 03317e28892fb63a08e9db2a3f1714409670b5d9 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 22 Nov 2024 14:07:26 -0600 Subject: [PATCH 17/19] import removed --- src/prefect/telemetry/run_telemetry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/prefect/telemetry/run_telemetry.py b/src/prefect/telemetry/run_telemetry.py index 498b7b60da69..43e39a705602 100644 --- a/src/prefect/telemetry/run_telemetry.py +++ b/src/prefect/telemetry/run_telemetry.py @@ -2,7 +2,6 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Dict -from opentelemetry.sdk.trace import Tracer from opentelemetry.trace import ( Status, StatusCode, From 8f87b5535bbce6de29e545f75385b20a8202dacc Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 22 Nov 2024 14:29:57 -0600 Subject: [PATCH 18/19] scope fix --- src/prefect/task_engine.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 0b32bdb7ef32..343313dd8b59 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -587,8 +587,7 @@ def handle_exception(self, exc: Exception) -> None: self.record_terminal_state_timing(state) self.set_state(state) self._raised = exc - - self._telemetry.end_span_on_failure(state.message) + self._telemetry.end_span_on_failure(state.message) def handle_timeout(self, exc: TimeoutError) -> None: if not self.handle_retry(exc): From 6f1cbfb724443335308b8ffdcdae5db0b996d954 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Fri, 22 Nov 2024 14:54:50 -0600 Subject: [PATCH 19/19] labels functin removed --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 159e8d828f2c..f4cb6f0574f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,4 +16,3 @@ pytz >= 2021.1, < 2025 readchar >= 4.0.0, < 5.0.0 sqlalchemy[asyncio] >= 2.0, < 3.0.0 typer >= 0.12.0, != 0.12.2, < 0.14.0 -