Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(celery): account for a chains scenario #11498

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
if task_span:
task_span.set_exc_info(*sys.exc_info())

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.set_exc_info(*sys.exc_info())

raise
finally:
task_span = core.get_item("task_span")
Expand All @@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
3 changes: 0 additions & 3 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down
26 changes: 26 additions & 0 deletions tests/contrib/celery/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,32 @@ def fn_a(self, user, force_logout=False):
trace_id = traces[0][0].trace_id
assert all(trace_id == span.trace_id for trace in traces for span in trace)

# Ensure that when we call Celery chains, the root span has celery specific span tags
def test_task_chain_task_call_task(self):
@self.app.task
def fn_a():
return 1

@self.app.task
def fn_b():
return 2

traces = None
try:
# See https://github.com/DataDog/dd-trace-py/issues/11479
(fn_a.si() | fn_b.si()).delay()
import time

time.sleep(10)
except Exception:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Code Quality Violation

silent exception (...read more)

Using the pass statement in an exception block ignores the exception. Exceptions should never be ignored. Instead, the user must add code to notify an exception occurred and attempt to handle it or recover from it.

View in Datadog  Leave us feedback  Documentation


traces = self.pop_traces()
assert len(traces) == 3
# Assert that each celery related span has a celery specific tag like celery.action
# TODO figure out why this test passes on the main branch in pytest but not a real app
assert all("celery.action" in span._meta.keys() for trace in traces for span in trace)

@mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError))
def test_fn_task_apply_async_soft_exception(self):
# If the underlying library runs into an exception that doesn't crash the app
Expand Down
Loading