Skip to content

Commit 4f0b525

Browse files
eliothedemanfacebook-github-bot
authored andcommitted
Fix python logs in asyncio code not showing up in scuba (#155)
Summary: Because we are running our python asyncio event loop on an os thread that tokio doesn't manage, we need to manually propagate our context and create a span while our code is running. This diff adds some helper functions to manually enter and exit a span whenever actor based asyncio code is running. Reviewed By: dulinriley Differential Revision: D75957462
1 parent e3e5dfd commit 4f0b525

File tree

5 files changed

+126
-11
lines changed

5 files changed

+126
-11
lines changed

hyperactor_extension/src/telemetry.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,42 @@
88

99
#![allow(unsafe_op_in_unsafe_fn)]
1010

11+
use std::cell::Cell;
12+
1113
use pyo3::prelude::*;
14+
use tracing::span::EnteredSpan;
15+
// Thread local to store the current span
16+
thread_local! {
17+
static ACTIVE_ACTOR_SPAN: Cell<Option<EnteredSpan>> = const { Cell::new(None) };
18+
}
19+
20+
/// Enter the span stored in the thread local
21+
#[pyfunction]
22+
pub fn enter_span(module_name: String, method_name: String, actor_id: String) -> PyResult<()> {
23+
let mut maybe_span = ACTIVE_ACTOR_SPAN.take();
24+
if maybe_span.is_none() {
25+
maybe_span = Some(
26+
tracing::info_span!(
27+
"py_actor_method",
28+
name = method_name,
29+
target = module_name,
30+
actor_id = actor_id
31+
)
32+
.entered(),
33+
);
34+
}
35+
ACTIVE_ACTOR_SPAN.set(maybe_span);
36+
Ok(())
37+
}
38+
39+
/// Exit the span stored in the thread local
40+
#[pyfunction]
41+
pub fn exit_span() -> PyResult<()> {
42+
ACTIVE_ACTOR_SPAN.replace(None);
43+
Ok(())
44+
}
1245

13-
/// Log a message with the given metadata
46+
/// Log a message with the given metaata
1447
#[pyfunction]
1548
pub fn forward_to_tracing(message: &str, file: &str, lineno: i64, level: i32) {
1649
// Map level number to level name
@@ -23,15 +56,29 @@ pub fn forward_to_tracing(message: &str, file: &str, lineno: i64, level: i32) {
2356
}
2457
}
2558

26-
use pyo3::Bound;
27-
use pyo3::types::PyModule;
28-
2959
pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
60+
// Register the forward_to_tracing function
3061
let f = wrap_pyfunction!(forward_to_tracing, module)?;
3162
f.setattr(
3263
"__module__",
3364
"monarch._rust_bindings.hyperactor_extension.telemetry",
3465
)?;
3566
module.add_function(f)?;
67+
68+
// Register the span-related functions
69+
let enter_span_fn = wrap_pyfunction!(enter_span, module)?;
70+
enter_span_fn.setattr(
71+
"__module__",
72+
"monarch._rust_bindings.hyperactor_extension.telemetry",
73+
)?;
74+
module.add_function(enter_span_fn)?;
75+
76+
let exit_span_fn = wrap_pyfunction!(exit_span, module)?;
77+
exit_span_fn.setattr(
78+
"__module__",
79+
"monarch._rust_bindings.hyperactor_extension.telemetry",
80+
)?;
81+
module.add_function(exit_span_fn)?;
82+
3683
Ok(())
3784
}

monarch_hyperactor/src/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use serde::Serialize;
3737
use serde_bytes::ByteBuf;
3838
use tokio::sync::Mutex;
3939
use tokio::sync::oneshot;
40+
use tracing::span::Id;
4041

4142
use crate::mailbox::PyMailbox;
4243
use crate::proc::InstanceWrapper;

python/monarch/_rust_bindings/hyperactor_extension/telemetry.pyi

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,49 @@
66

77
def forward_to_tracing(message: str, file: str, lineno: int, level: int) -> None:
88
"""
9-
Log a message with the given metadata.
9+
Log a message with the given metadata using the tracing system.
10+
11+
This function forwards Python log messages to the Rust tracing system,
12+
preserving the original source location and log level.
1013
1114
Args:
12-
- message (str): The log message.
15+
- message (str): The log message content.
1316
- file (str): The file where the log message originated.
1417
- lineno (int): The line number where the log message originated.
15-
- level (int): The log level (10 for debug, 20 for info, 30 for warn, 40 for error).
18+
- level (int): The log level:
19+
- 10: DEBUG
20+
- 20: INFO
21+
- 30: WARN
22+
- 40: ERROR
23+
- other values default to INFO
24+
"""
25+
...
26+
27+
def enter_span(module_name: str, method_name: str, actor_id: str) -> None:
28+
"""
29+
Enter a tracing span for a Python actor method.
30+
31+
Creates and enters a new tracing span for the current thread that tracks
32+
execution of a Python actor method. The span is stored in thread-local
33+
storage and will be active until exit_span() is called.
34+
35+
If a span is already active for the current thread, this function will
36+
preserve that span and not create a new one.
37+
38+
Args:
39+
- module_name (str): The name of the module containing the actor (used as the target).
40+
- method_name (str): The name of the method being called (used as the span name).
41+
- actor_id (str): The ID of the actor instance (included as a field in the span).
42+
"""
43+
...
44+
45+
def exit_span() -> None:
46+
"""
47+
Exit the current tracing span for a Python actor method.
48+
49+
Exits and drops the tracing span that was previously created by enter_span().
50+
This should be called when the actor method execution is complete.
51+
52+
If no span is currently active for this thread, this function has no effect.
1653
"""
1754
...

python/monarch/actor_mesh.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import asyncio
88
import collections
99
import contextvars
10+
import functools
1011
import inspect
1112

1213
import itertools
@@ -38,6 +39,7 @@
3839

3940
import monarch
4041
from monarch import ActorFuture as Future
42+
from monarch._rust_bindings.hyperactor_extension.telemetry import enter_span, exit_span
4143

4244
from monarch._rust_bindings.monarch_hyperactor.actor import PanicFlag, PythonMessage
4345
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh
@@ -49,6 +51,7 @@
4951
)
5052
from monarch._rust_bindings.monarch_hyperactor.proc import ActorId
5153
from monarch._rust_bindings.monarch_hyperactor.shape import Point as HyPoint, Shape
54+
5255
from monarch.common.pickle_flatten import flatten, unflatten
5356
from monarch.common.shape import MeshTrait, NDSlice
5457

@@ -492,13 +495,28 @@ def handle_cast(
492495
return None
493496
else:
494497
the_method = getattr(self.instance, message.method)._method
495-
result = the_method(self.instance, *args, **kwargs)
498+
496499
if not inspect.iscoroutinefunction(the_method):
500+
enter_span(
501+
the_method.__module__, message.method, str(ctx.mailbox.actor_id)
502+
)
503+
result = the_method(self.instance, *args, **kwargs)
504+
exit_span()
497505
if port is not None:
498506
port.send("result", result)
499507
return None
500508

501-
return self.run_async(ctx, self.run_task(port, result, panic_flag))
509+
async def instrumented():
510+
enter_span(
511+
the_method.__module__, message.method, str(ctx.mailbox.actor_id)
512+
)
513+
result = await the_method(self.instance, *args, **kwargs)
514+
exit_span()
515+
return result
516+
517+
return self.run_async(
518+
ctx, self.run_task(port, instrumented(), panic_flag)
519+
)
502520
except Exception as e:
503521
traceback.print_exc()
504522
s = ActorError(e)
@@ -510,7 +528,13 @@ def handle_cast(
510528
else:
511529
raise s from None
512530

513-
async def run_async(self, ctx, coroutine):
531+
async def run_async(
532+
self,
533+
module_name: str,
534+
method_name: str,
535+
ctx: MonarchContext,
536+
coroutine: Coroutine[Any, None, Any],
537+
) -> None:
514538
_context.set(ctx)
515539
if self.complete_task is None:
516540
self.complete_task = asyncio.create_task(self._complete())
@@ -564,6 +588,12 @@ def _unpickle(data: bytes, mailbox: Mailbox) -> Any:
564588

565589

566590
class Actor(MeshTrait):
591+
@functools.cached_property
592+
def logger(cls) -> logging.Logger:
593+
lgr = logging.getLogger(cls.__class__.__name__)
594+
lgr.setLevel(logging.DEBUG)
595+
return lgr
596+
567597
@property
568598
def _ndslice(self) -> NDSlice:
569599
raise NotImplementedError(

python/monarch/bootstrap_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def emit(self, record: logging.LogRecord) -> None:
5858

5959
# forward logs to rust tracing. Defaults to on.
6060
if os.environ.get("MONARCH_PYTHON_LOG_TRACING", "1") == "1":
61-
logging.root.addHandler(TracingForwarder())
61+
logging.root.addHandler(TracingForwarder(level=logging.DEBUG))
6262

6363
try:
6464
with (

0 commit comments

Comments
 (0)