Skip to content

Commit 6e159f7

Browse files
eliothedemanfacebook-github-bot
authored andcommitted
Fix python logs in asyncio code not showing up in scuba
Differential Revision: D75957462
1 parent 08c96e0 commit 6e159f7

File tree

5 files changed

+112
-8
lines changed

5 files changed

+112
-8
lines changed

hyperactor_extension/src/telemetry.rs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,47 @@
88

99
#![allow(unsafe_op_in_unsafe_fn)]
1010

11+
use std::cell::Cell;
12+
1113
use pyo3::prelude::*;
14+
use tracing::span::EnteredSpan;
15+
// Thread local to store the current span
16+
thread_local! {
17+
static ACTIVE_ACTOR_ID: Cell<Option<String>> = const { Cell::new(None) };
18+
static ACTIVE_ACTOR_SPAN: Cell<Option<EnteredSpan>> = const { Cell::new(None) };
19+
}
20+
21+
pub fn set_py_actor(name: String) {
22+
ACTIVE_ACTOR_ID.replace(Some(name));
23+
}
24+
25+
/// Enter the span stored in the thread local
26+
#[pyfunction]
27+
pub fn enter_span(module_name: String, method_name: String, actor_id: String) -> PyResult<()> {
28+
let mut maybe_span = ACTIVE_ACTOR_SPAN.take();
29+
if maybe_span.is_none() {
30+
maybe_span = Some(
31+
tracing::info_span!(
32+
"py_actor_method",
33+
name = method_name,
34+
target = module_name,
35+
actor_id = actor_id
36+
)
37+
.entered(),
38+
);
39+
}
40+
ACTIVE_ACTOR_SPAN.set(maybe_span);
41+
Ok(())
42+
}
1243

13-
/// Log a message with the given metadata
44+
/// Exit the span stored in the thread local
45+
#[pyfunction]
46+
pub fn exit_span() -> PyResult<()> {
47+
ACTIVE_ACTOR_SPAN.replace(None);
48+
Ok(())
49+
}
50+
51+
/// Log a message with the given metaata
1452
#[pyfunction]
1553
pub fn forward_to_tracing(message: &str, file: &str, lineno: i64, level: i32) {
1654
// Map level number to level name
@@ -23,15 +61,29 @@ pub fn forward_to_tracing(message: &str, file: &str, lineno: i64, level: i32) {
2361
}
2462
}
2563

26-
use pyo3::Bound;
27-
use pyo3::types::PyModule;
28-
2964
pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
65+
// Register the forward_to_tracing function
3066
let f = wrap_pyfunction!(forward_to_tracing, module)?;
3167
f.setattr(
3268
"__module__",
3369
"monarch._rust_bindings.hyperactor_extension.telemetry",
3470
)?;
3571
module.add_function(f)?;
72+
73+
// Register the span-related functions
74+
let enter_span_fn = wrap_pyfunction!(enter_span, module)?;
75+
enter_span_fn.setattr(
76+
"__module__",
77+
"monarch._rust_bindings.hyperactor_extension.telemetry",
78+
)?;
79+
module.add_function(enter_span_fn)?;
80+
81+
let exit_span_fn = wrap_pyfunction!(exit_span, module)?;
82+
exit_span_fn.setattr(
83+
"__module__",
84+
"monarch._rust_bindings.hyperactor_extension.telemetry",
85+
)?;
86+
module.add_function(exit_span_fn)?;
87+
3688
Ok(())
3789
}

monarch_hyperactor/src/actor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use serde::Deserialize;
3333
use serde::Serialize;
3434
use serde_bytes::ByteBuf;
3535
use tokio::sync::Mutex;
36+
use tracing::span::Id;
3637

3738
use crate::mailbox::PyMailbox;
3839
use crate::proc::InstanceWrapper;
@@ -292,6 +293,7 @@ impl Handler<PythonMessage> for PythonActor {
292293
this: &Instance<Self>,
293294
message: PythonMessage,
294295
) -> anyhow::Result<()> {
296+
this.self_id();
295297
let future = Python::with_gil(|py| -> PyResult<_> {
296298
let mailbox = PyMailbox {
297299
inner: this.mailbox_for_py().clone(),

python/monarch/_rust_bindings/hyperactor_extension/telemetry.pyi

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,20 @@ def forward_to_tracing(message: str, file: str, lineno: int, level: int) -> None
1515
- level (int): The log level (10 for debug, 20 for info, 30 for warn, 40 for error).
1616
"""
1717
...
18+
19+
def enter_span(module_name: str, method_name: str, actor_id: str) -> None:
20+
"""
21+
Enter a tracing span for a Python actor method.
22+
23+
Args:
24+
- module_name (str): The name of the module containing the actor.
25+
- method_name (str): The name of the method being called.
26+
- actor_id (str): The ID of the actor instance.
27+
"""
28+
...
29+
30+
def exit_span() -> None:
31+
"""
32+
Exit the current tracing span for a Python actor method.
33+
"""
34+
...

python/monarch/actor_mesh.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import asyncio
88
import collections
99
import contextvars
10+
import functools
1011
import inspect
1112

1213
import itertools
@@ -38,6 +39,7 @@
3839

3940
import monarch
4041
from monarch import ActorFuture as Future
42+
from monarch._rust_bindings.hyperactor_extension.telemetry import enter_span, exit_span
4143

4244
from monarch._rust_bindings.monarch_hyperactor.actor import PythonMessage
4345
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh
@@ -49,6 +51,7 @@
4951
)
5052
from monarch._rust_bindings.monarch_hyperactor.proc import ActorId
5153
from monarch._rust_bindings.monarch_hyperactor.shape import Point as HyPoint, Shape
54+
5255
from monarch.common.pickle_flatten import flatten, unflatten
5356
from monarch.common.shape import MeshTrait, NDSlice, Shape
5457

@@ -487,13 +490,31 @@ def handle_cast(
487490
return None
488491
else:
489492
the_method = getattr(self.instance, message.method)._method
490-
result = the_method(self.instance, *args, **kwargs)
493+
491494
if not inspect.iscoroutinefunction(the_method):
495+
enter_span(
496+
the_method.__module__, message.method, str(ctx.mailbox.actor_id)
497+
)
498+
result = the_method(self.instance, *args, **kwargs)
499+
exit_span()
492500
if port is not None:
493501
port.send("result", result)
494502
return None
495503

496-
return self.run_async(ctx, self.run_task(port, result))
504+
async def instrumented():
505+
enter_span(
506+
the_method.__module__, message.method, str(ctx.mailbox.actor_id)
507+
)
508+
result = await the_method(self.instance, *args, **kwargs)
509+
exit_span()
510+
return result
511+
512+
return self.run_async(
513+
self.instance.__module__,
514+
message.method,
515+
ctx,
516+
self.run_task(port, instrumented()),
517+
)
497518
except Exception as e:
498519
traceback.print_exc()
499520
s = ActorMeshRefCallFailedException(e)
@@ -505,7 +526,13 @@ def handle_cast(
505526
else:
506527
raise s from None
507528

508-
async def run_async(self, ctx, coroutine):
529+
async def run_async(
530+
self,
531+
module_name: str,
532+
method_name: str,
533+
ctx: MonarchContext,
534+
coroutine: Coroutine[Any, None, Any],
535+
) -> None:
509536
_context.set(ctx)
510537
if self.complete_task is None:
511538
asyncio.create_task(self._complete())
@@ -545,6 +572,12 @@ def _unpickle(data: bytes, mailbox: Mailbox) -> Any:
545572

546573

547574
class Actor(MeshTrait):
575+
@functools.cached_property
576+
def logger(cls) -> logging.Logger:
577+
lgr = logging.getLogger(cls.__class__.__name__)
578+
lgr.setLevel(logging.DEBUG)
579+
return lgr
580+
548581
@property
549582
def _ndslice(self) -> NDSlice:
550583
raise NotImplementedError(

python/monarch/bootstrap_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def emit(self, record: logging.LogRecord) -> None:
5555

5656
# forward logs to rust tracing. Defaults to on.
5757
if os.environ.get("MONARCH_PYTHON_LOG_TRACING", "1") == "1":
58-
logging.root.addHandler(TracingForwarder())
58+
logging.root.addHandler(TracingForwarder(level=logging.DEBUG))
5959

6060
with (
6161
importlib.resources.path("monarch", "py-spy") as pyspy,

0 commit comments

Comments
 (0)