From 883e0738c7c6e8a0e918796798dc4b856e0fa591 Mon Sep 17 00:00:00 2001 From: wan Date: Thu, 21 Nov 2024 18:31:58 -0500 Subject: [PATCH 1/4] Remove postrun spans logic to account for chain tasks scenarios. --- ddtrace/contrib/internal/celery/app.py | 11 ----------- ddtrace/contrib/internal/celery/signals.py | 2 -- 2 files changed, 13 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index b61585097a..42eed2cb46 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs): if task_span: task_span.set_exc_info(*sys.exc_info()) - prerun_span = core.get_item("prerun_span") - if prerun_span: - prerun_span.set_exc_info(*sys.exc_info()) - raise finally: task_span = core.get_item("task_span") @@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs): ) task_span.finish() - prerun_span = core.get_item("prerun_span") - if prerun_span: - log.debug( - "The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint() - ) - prerun_span.finish() - return _traced_apply_async_inner diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index e235d6efb3..ac34483de5 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -52,8 +52,6 @@ def trace_prerun(*args, **kwargs): service = config.celery["worker_service_name"] span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER) - # Store an item called "prerun span" in case task_postrun doesn't get called - core.set_item("prerun_span", span) # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) From e860319cbed23d949e9a8088244ebd0c9a91ceca Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 22 Nov 2024 19:40:23 -0500 Subject: [PATCH 2/4] Fix spacing. --- ddtrace/contrib/internal/celery/signals.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index ac34483de5..d90c47b54b 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -52,7 +52,6 @@ def trace_prerun(*args, **kwargs): service = config.celery["worker_service_name"] span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER) - # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) From 4228d2db7bcfa6736bb59f081abdeb3e60c57266 Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 22 Nov 2024 20:14:41 -0500 Subject: [PATCH 3/4] Add test with celery chains with the pipe operator. --- tests/contrib/celery/test_integration.py | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 03276a7929..586ccf2d59 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -5,6 +5,7 @@ from time import sleep import celery +from celery import chain from celery.exceptions import Retry import mock import pytest @@ -477,6 +478,32 @@ def fn_a(self, user, force_logout=False): trace_id = traces[0][0].trace_id assert all(trace_id == span.trace_id for trace in traces for span in trace) + # Ensure that when we call Celery chains, the root span has celery specific span tags + def test_task_chain_task_call_task(self): + @self.app.task + def fn_a(): + return 1 + + @self.app.task + def fn_b(): + return 2 + + traces = None + try: + # See https://github.com/DataDog/dd-trace-py/issues/11479 + (fn_a.si() | fn_b.si()).delay() + import time + + time.sleep(10) + except Exception: + pass + + traces = self.pop_traces() + assert len(traces) == 3 + # Assert that each celery related span has a celery specific tag like celery.action + # TODO figure out why this test passes on the main branch in pytest but not a real app + assert all("celery.action" in span._meta.keys() for trace in traces for span in trace) + @mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError)) def test_fn_task_apply_async_soft_exception(self): # If the underlying library runs into an exception that doesn't crash the app From bc68ed36d9cfbfd07d89cbeb2304d5465c260720 Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 22 Nov 2024 20:15:41 -0500 Subject: [PATCH 4/4] Remove chain import due to using pipe operator. --- tests/contrib/celery/test_integration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 586ccf2d59..2842a2b9f1 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -5,7 +5,6 @@ from time import sleep import celery -from celery import chain from celery.exceptions import Retry import mock import pytest