Skip to content

Commit 40fe58d

Browse files
authored
fix(futures): ensure child spans are correctly parented in Threadpools (#13168)
Introduces a private API for activating a trace `Context` in a contextmanager. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
1 parent 85e6e9f commit 40fe58d

File tree

8 files changed

+138
-32
lines changed

8 files changed

+138
-32
lines changed

ddtrace/_trace/context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class Context(object):
5252
"_span_links",
5353
"_baggage",
5454
"_is_remote",
55+
"_reactivate",
5556
"__weakref__",
5657
]
5758

@@ -75,6 +76,7 @@ def __init__(
7576
self.trace_id: Optional[int] = trace_id
7677
self.span_id: Optional[int] = span_id
7778
self._is_remote: bool = is_remote
79+
self._reactivate: bool = False
7880

7981
if dd_origin is not None and _DD_ORIGIN_INVALID_CHARS_REGEX.search(dd_origin) is None:
8082
self._meta[_ORIGIN_KEY] = dd_origin

ddtrace/_trace/provider.py

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
log = get_logger(__name__)
1515

1616

17-
_DD_CONTEXTVAR: contextvars.ContextVar[Optional[Union[Context, Span]]] = contextvars.ContextVar(
17+
ActiveTrace = Union[Span, Context]
18+
_DD_CONTEXTVAR: contextvars.ContextVar[Optional[ActiveTrace]] = contextvars.ContextVar(
1819
"datadog_contextvar", default=None
1920
)
2021

@@ -37,11 +38,11 @@ def _has_active_context(self) -> bool:
3738
pass
3839

3940
@abc.abstractmethod
40-
def activate(self, ctx: Optional[Union[Context, Span]]) -> None:
41+
def activate(self, ctx: Optional[ActiveTrace]) -> None:
4142
self._hooks.emit(self.activate, ctx)
4243

4344
@abc.abstractmethod
44-
def active(self) -> Optional[Union[Context, Span]]:
45+
def active(self) -> Optional[ActiveTrace]:
4546
pass
4647

4748
def _on_activate(
@@ -70,37 +71,14 @@ def _deregister_on_activate(
7071
self._hooks.deregister(self.activate, func)
7172
return func
7273

73-
def __call__(self, *args: Any, **kwargs: Any) -> Optional[Union[Context, Span]]:
74+
def __call__(self, *args: Any, **kwargs: Any) -> Optional[ActiveTrace]:
7475
"""Method available for backward-compatibility. It proxies the call to
7576
``self.active()`` and must not do anything more.
7677
"""
7778
return self.active()
7879

7980

80-
class DatadogContextMixin(object):
81-
"""Mixin that provides active span updating suitable for synchronous
82-
and asynchronous executions.
83-
"""
84-
85-
def activate(self, ctx: Optional[Union[Context, Span]]) -> None:
86-
raise NotImplementedError
87-
88-
def _update_active(self, span: Span) -> Optional[Span]:
89-
"""Updates the active span in an executor.
90-
91-
The active span is updated to be the span's parent if the span has
92-
finished until an unfinished span is found.
93-
"""
94-
if span.finished:
95-
new_active: Optional[Span] = span
96-
while new_active and new_active.finished:
97-
new_active = new_active._parent
98-
self.activate(new_active)
99-
return new_active
100-
return span
101-
102-
103-
class DefaultContextProvider(BaseContextProvider, DatadogContextMixin):
81+
class DefaultContextProvider(BaseContextProvider):
10482
"""Context provider that retrieves contexts from a context variable.
10583
10684
It is suitable for synchronous programming and for asynchronous executors
@@ -115,14 +93,31 @@ def _has_active_context(self) -> bool:
11593
ctx = _DD_CONTEXTVAR.get()
11694
return ctx is not None
11795

118-
def activate(self, ctx: Optional[Union[Span, Context]]) -> None:
96+
def activate(self, ctx: Optional[ActiveTrace]) -> None:
11997
"""Makes the given context active in the current execution."""
12098
_DD_CONTEXTVAR.set(ctx)
12199
super(DefaultContextProvider, self).activate(ctx)
122100

123-
def active(self) -> Optional[Union[Context, Span]]:
101+
def active(self) -> Optional[ActiveTrace]:
124102
"""Returns the active span or context for the current execution."""
125103
item = _DD_CONTEXTVAR.get()
126104
if isinstance(item, Span):
127105
return self._update_active(item)
128106
return item
107+
108+
def _update_active(self, span: Span) -> Optional[ActiveTrace]:
109+
"""Updates the active trace in an executor.
110+
111+
When a span finishes, the active span becomes its parent.
112+
If no parent exists and the context is reactivatable, that context is restored.
113+
"""
114+
if span.finished:
115+
new_active: Optional[Span] = span
116+
while new_active and new_active.finished:
117+
if new_active._parent is None and new_active._parent_context and new_active._parent_context._reactivate:
118+
self.activate(new_active._parent_context)
119+
return new_active._parent_context
120+
new_active = new_active._parent
121+
self.activate(new_active)
122+
return new_active
123+
return span

ddtrace/_trace/span.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ class Span(object):
129129
"duration_ns",
130130
# Internal attributes
131131
"_context",
132+
"_parent_context",
132133
"_local_root_value",
133134
"_parent",
134135
"_ignored_exceptions",
@@ -211,6 +212,7 @@ def __init__(
211212
self.parent_id: Optional[int] = parent_id
212213
self._on_finish_callbacks = [] if on_finish is None else on_finish
213214

215+
self._parent_context: Optional[Context] = context
214216
self._context = context.copy(self.trace_id, self.span_id) if context else None
215217

216218
self._links: List[Union[SpanLink, _SpanPointer]] = []

ddtrace/_trace/tracer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from contextlib import contextmanager
12
import functools
23
from inspect import iscoroutinefunction
34
from itertools import chain
@@ -290,6 +291,17 @@ def _sample_before_fork(self) -> None:
290291
if span is not None and span.context.sampling_priority is None:
291292
self.sample(span)
292293

294+
@contextmanager
295+
def _activate_context(self, context: Context):
296+
prev_active = self.context_provider.active()
297+
context._reactivate = True
298+
self.context_provider.activate(context)
299+
try:
300+
yield
301+
finally:
302+
context._reactivate = False
303+
self.context_provider.activate(prev_active)
304+
293305
@property
294306
def _sampler(self):
295307
return self._span_aggregator.sampling_processor.sampler

ddtrace/contrib/internal/futures/threading.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ def _wrap_execution(ctx: Tuple[Optional[Context], Optional[Context]], fn, args,
3434
provider sets the Active context in a thread local storage
3535
variable because it's outside the asynchronous loop.
3636
"""
37-
if ctx[0] is not None:
38-
ddtrace.tracer.context_provider.activate(ctx[0])
3937
if ctx[1] is not None:
4038
core.dispatch("threading.execution", (ctx[1],))
39+
if ctx[0] is not None:
40+
with ddtrace.tracer._activate_context(ctx[0]):
41+
return fn(*args, **kwargs)
4142
return fn(*args, **kwargs)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
futures: Resolves an edge where trace context was not propagated to set to spans generated by the ThreadPoolExecutor.

tests/contrib/futures/test_propagation.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,35 @@ def fn(value, key=None):
9191
assert spans[0].trace_id == root.trace_id
9292
assert spans[0].parent_id == root.span_id
9393

94+
def test_propagation_with_sub_spans(self):
95+
@self.tracer.wrap("executor.thread")
96+
def fn1(value):
97+
return value
98+
99+
@self.tracer.wrap("executor.thread")
100+
def fn2(value):
101+
return value * 100
102+
103+
def main(value, key=None):
104+
return fn1(value) + fn2(value)
105+
106+
with self.override_global_tracer():
107+
with self.tracer.trace("main.thread"):
108+
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
109+
future = executor.submit(main, 42)
110+
value = future.result()
111+
# assert the right result
112+
self.assertEqual(value, 4242)
113+
114+
# the trace must be completed
115+
spans = self.get_spans()
116+
assert len(spans) == 3
117+
assert spans[0].name == "main.thread"
118+
for span in spans[1:]:
119+
assert span.name == "executor.thread"
120+
assert span.trace_id == spans[0].trace_id
121+
assert span.parent_id == spans[0].span_id
122+
94123
def test_propagation_with_kwargs(self):
95124
# instrumentation must work if only kwargs are provided
96125

tests/tracer/test_tracer.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,3 +1904,64 @@ def test_multiple_tracer_instances():
19041904
log.error.assert_called_once_with(
19051905
"Initializing multiple Tracer instances is not supported. Use ``ddtrace.trace.tracer`` instead.",
19061906
)
1907+
1908+
1909+
def test_span_creation_with_context(tracer):
1910+
"""Test that a span created with a Context parent correctly sets its parent context.
1911+
1912+
This test verifies that when a span is created with a Context object as its parent,
1913+
the span's _parent_context attribute is properly set to that Context.
1914+
"""
1915+
1916+
context = Context(trace_id=123, span_id=321)
1917+
with tracer.start_span("s1", child_of=context) as span:
1918+
assert span._parent_context == context
1919+
1920+
1921+
def test_activate_context_propagates_to_child_spans(tracer):
1922+
"""Test that activating a context properly propagates to child spans.
1923+
1924+
This test verifies that when a context is activated using _activate_context:
1925+
1. Child spans created within the context inherit the trace_id and parent_id
1926+
2. Multiple child spans within the same context maintain consistent parentage
1927+
3. Spans created outside the context have different trace_id and parent_id
1928+
"""
1929+
with tracer._activate_context(Context(trace_id=1, span_id=1)):
1930+
with tracer.trace("child1") as c1:
1931+
assert c1.trace_id == 1
1932+
assert c1.parent_id == 1
1933+
1934+
with tracer.trace("child1") as c2:
1935+
assert c2.trace_id == 1
1936+
assert c2.parent_id == 1
1937+
1938+
with tracer.trace("root") as root:
1939+
assert root.trace_id != 1
1940+
assert root.parent_id != 1
1941+
1942+
1943+
def test_activate_context_nesting_and_restoration(tracer):
1944+
"""Test that context activation properly handles nesting and restoration.
1945+
1946+
This test verifies that:
1947+
1. A context can be activated and its values are accessible
1948+
2. A nested context can be activated and its values override the outer context
1949+
3. When the nested context exits, the outer context is properly restored
1950+
4. When all contexts exit, the active context is None
1951+
"""
1952+
1953+
with tracer._activate_context(Context(trace_id=1, span_id=1)):
1954+
active = tracer.context_provider.active()
1955+
assert active.trace_id == 1
1956+
assert active.span_id == 1
1957+
1958+
with tracer._activate_context(Context(trace_id=2, span_id=2)):
1959+
active = tracer.context_provider.active()
1960+
assert active.trace_id == 2
1961+
assert active.span_id == 2
1962+
1963+
active = tracer.context_provider.active()
1964+
assert active.trace_id == 1
1965+
assert active.span_id == 1
1966+
1967+
assert tracer.context_provider.active() is None

0 commit comments

Comments
 (0)