Skip to content

Commit 3a5e0fa

Browse files
eliothedemanfacebook-github-bot
authored andcommitted
Fix python logs in asyncio code not showing up in scuba
Summary: Because we are running our python asyncio event loop on an os thread that tokio doesn't manage, we need to manually propagate our context and create a span while our code is running. This diff adds some helper functions to manually enter and exit a span whenever actor based asyncio code is running. Differential Revision: D75957462
1 parent 36658a5 commit 3a5e0fa

File tree

5 files changed

+130
-11
lines changed

5 files changed

+130
-11
lines changed

hyperactor_extension/src/telemetry.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,42 @@
88

99
#![allow(unsafe_op_in_unsafe_fn)]
1010

11+
use std::cell::Cell;
12+
1113
use pyo3::prelude::*;
14+
use tracing::span::EnteredSpan;
15+
// Thread local to store the current span
16+
thread_local! {
17+
static ACTIVE_ACTOR_SPAN: Cell<Option<EnteredSpan>> = const { Cell::new(None) };
18+
}
19+
20+
/// Enter the span stored in the thread local
21+
#[pyfunction]
22+
pub fn enter_span(module_name: String, method_name: String, actor_id: String) -> PyResult<()> {
23+
let mut maybe_span = ACTIVE_ACTOR_SPAN.take();
24+
if maybe_span.is_none() {
25+
maybe_span = Some(
26+
tracing::info_span!(
27+
"py_actor_method",
28+
name = method_name,
29+
target = module_name,
30+
actor_id = actor_id
31+
)
32+
.entered(),
33+
);
34+
}
35+
ACTIVE_ACTOR_SPAN.set(maybe_span);
36+
Ok(())
37+
}
38+
39+
/// Exit the span stored in the thread local
40+
#[pyfunction]
41+
pub fn exit_span() -> PyResult<()> {
42+
ACTIVE_ACTOR_SPAN.replace(None);
43+
Ok(())
44+
}
1245

13-
/// Log a message with the given metadata
46+
/// Log a message with the given metaata
1447
#[pyfunction]
1548
pub fn forward_to_tracing(message: &str, file: &str, lineno: i64, level: i32) {
1649
// Map level number to level name
@@ -23,15 +56,29 @@ pub fn forward_to_tracing(message: &str, file: &str, lineno: i64, level: i32) {
2356
}
2457
}
2558

26-
use pyo3::Bound;
27-
use pyo3::types::PyModule;
28-
2959
pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
60+
// Register the forward_to_tracing function
3061
let f = wrap_pyfunction!(forward_to_tracing, module)?;
3162
f.setattr(
3263
"__module__",
3364
"monarch._rust_bindings.hyperactor_extension.telemetry",
3465
)?;
3566
module.add_function(f)?;
67+
68+
// Register the span-related functions
69+
let enter_span_fn = wrap_pyfunction!(enter_span, module)?;
70+
enter_span_fn.setattr(
71+
"__module__",
72+
"monarch._rust_bindings.hyperactor_extension.telemetry",
73+
)?;
74+
module.add_function(enter_span_fn)?;
75+
76+
let exit_span_fn = wrap_pyfunction!(exit_span, module)?;
77+
exit_span_fn.setattr(
78+
"__module__",
79+
"monarch._rust_bindings.hyperactor_extension.telemetry",
80+
)?;
81+
module.add_function(exit_span_fn)?;
82+
3683
Ok(())
3784
}

monarch_hyperactor/src/actor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use serde::Deserialize;
3333
use serde::Serialize;
3434
use serde_bytes::ByteBuf;
3535
use tokio::sync::Mutex;
36+
use tracing::span::Id;
3637

3738
use crate::mailbox::PyMailbox;
3839
use crate::proc::InstanceWrapper;
@@ -292,6 +293,7 @@ impl Handler<PythonMessage> for PythonActor {
292293
this: &Instance<Self>,
293294
message: PythonMessage,
294295
) -> anyhow::Result<()> {
296+
this.self_id();
295297
let future = Python::with_gil(|py| -> PyResult<_> {
296298
let mailbox = PyMailbox {
297299
inner: this.mailbox_for_py().clone(),

python/monarch/_rust_bindings/hyperactor_extension/telemetry.pyi

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,49 @@
66

77
def forward_to_tracing(message: str, file: str, lineno: int, level: int) -> None:
88
"""
9-
Log a message with the given metadata.
9+
Log a message with the given metadata using the tracing system.
10+
11+
This function forwards Python log messages to the Rust tracing system,
12+
preserving the original source location and log level.
1013
1114
Args:
12-
- message (str): The log message.
15+
- message (str): The log message content.
1316
- file (str): The file where the log message originated.
1417
- lineno (int): The line number where the log message originated.
15-
- level (int): The log level (10 for debug, 20 for info, 30 for warn, 40 for error).
18+
- level (int): The log level:
19+
- 10: DEBUG
20+
- 20: INFO
21+
- 30: WARN
22+
- 40: ERROR
23+
- other values default to INFO
24+
"""
25+
...
26+
27+
def enter_span(module_name: str, method_name: str, actor_id: str) -> None:
28+
"""
29+
Enter a tracing span for a Python actor method.
30+
31+
Creates and enters a new tracing span for the current thread that tracks
32+
execution of a Python actor method. The span is stored in thread-local
33+
storage and will be active until exit_span() is called.
34+
35+
If a span is already active for the current thread, this function will
36+
preserve that span and not create a new one.
37+
38+
Args:
39+
- module_name (str): The name of the module containing the actor (used as the target).
40+
- method_name (str): The name of the method being called (used as the span name).
41+
- actor_id (str): The ID of the actor instance (included as a field in the span).
42+
"""
43+
...
44+
45+
def exit_span() -> None:
46+
"""
47+
Exit the current tracing span for a Python actor method.
48+
49+
Exits and drops the tracing span that was previously created by enter_span().
50+
This should be called when the actor method execution is complete.
51+
52+
If no span is currently active for this thread, this function has no effect.
1653
"""
1754
...

python/monarch/actor_mesh.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import asyncio
88
import collections
99
import contextvars
10+
import functools
1011
import inspect
1112

1213
import itertools
@@ -38,6 +39,7 @@
3839

3940
import monarch
4041
from monarch import ActorFuture as Future
42+
from monarch._rust_bindings.hyperactor_extension.telemetry import enter_span, exit_span
4143

4244
from monarch._rust_bindings.monarch_hyperactor.actor import PythonMessage
4345
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh
@@ -49,6 +51,7 @@
4951
)
5052
from monarch._rust_bindings.monarch_hyperactor.proc import ActorId
5153
from monarch._rust_bindings.monarch_hyperactor.shape import Point as HyPoint, Shape
54+
5255
from monarch.common.pickle_flatten import flatten, unflatten
5356
from monarch.common.shape import MeshTrait, NDSlice, Shape
5457

@@ -487,13 +490,31 @@ def handle_cast(
487490
return None
488491
else:
489492
the_method = getattr(self.instance, message.method)._method
490-
result = the_method(self.instance, *args, **kwargs)
493+
491494
if not inspect.iscoroutinefunction(the_method):
495+
enter_span(
496+
the_method.__module__, message.method, str(ctx.mailbox.actor_id)
497+
)
498+
result = the_method(self.instance, *args, **kwargs)
499+
exit_span()
492500
if port is not None:
493501
port.send("result", result)
494502
return None
495503

496-
return self.run_async(ctx, self.run_task(port, result))
504+
async def instrumented():
505+
enter_span(
506+
the_method.__module__, message.method, str(ctx.mailbox.actor_id)
507+
)
508+
result = await the_method(self.instance, *args, **kwargs)
509+
exit_span()
510+
return result
511+
512+
return self.run_async(
513+
self.instance.__module__,
514+
message.method,
515+
ctx,
516+
self.run_task(port, instrumented()),
517+
)
497518
except Exception as e:
498519
traceback.print_exc()
499520
s = ActorMeshRefCallFailedException(e)
@@ -505,7 +526,13 @@ def handle_cast(
505526
else:
506527
raise s from None
507528

508-
async def run_async(self, ctx, coroutine):
529+
async def run_async(
530+
self,
531+
module_name: str,
532+
method_name: str,
533+
ctx: MonarchContext,
534+
coroutine: Coroutine[Any, None, Any],
535+
) -> None:
509536
_context.set(ctx)
510537
if self.complete_task is None:
511538
asyncio.create_task(self._complete())
@@ -545,6 +572,12 @@ def _unpickle(data: bytes, mailbox: Mailbox) -> Any:
545572

546573

547574
class Actor(MeshTrait):
575+
@functools.cached_property
576+
def logger(cls) -> logging.Logger:
577+
lgr = logging.getLogger(cls.__class__.__name__)
578+
lgr.setLevel(logging.DEBUG)
579+
return lgr
580+
548581
@property
549582
def _ndslice(self) -> NDSlice:
550583
raise NotImplementedError(

python/monarch/bootstrap_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def emit(self, record: logging.LogRecord) -> None:
5555

5656
# forward logs to rust tracing. Defaults to on.
5757
if os.environ.get("MONARCH_PYTHON_LOG_TRACING", "1") == "1":
58-
logging.root.addHandler(TracingForwarder())
58+
logging.root.addHandler(TracingForwarder(level=logging.DEBUG))
5959

6060
with (
6161
importlib.resources.path("monarch", "py-spy") as pyspy,

0 commit comments

Comments
 (0)