Skip to content

Plumb actor supervision events into actor mesh #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hyperactor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ declare_attrs! {

/// Flag indicating if this is a managed subprocess
pub attr IS_MANAGED_SUBPROCESS: bool = false;

/// Maximum number of supervision events that can be buffered by client handlers
pub attr MAX_SUPERVISION_EVENTS: usize = 10;
}

/// Load configuration from environment variables
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ struct InstanceState {
signal: PortHandle<Signal>,

/// The actor's supervision port. This is used to send
/// supervision event to the actor.
/// supervision event to the actor (usually by its children).
supervision_port: PortHandle<ActorSupervisionEvent>,

/// An observer that stores the current status of the actor.
Expand Down
28 changes: 27 additions & 1 deletion hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use hyperactor::message::Bindings;
use hyperactor::message::Castable;
use hyperactor::message::IndexedErasedUnbound;
use hyperactor::message::Unbind;
use hyperactor::supervision::ActorSupervisionEvent;
use ndslice::Range;
use ndslice::Selection;
use ndslice::Shape;
Expand All @@ -36,6 +37,7 @@ use ndslice::dsl;
use ndslice::selection::ReifyView;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::mpsc;

use crate::Mesh;
use crate::comm::multicast::CastMessage;
Expand Down Expand Up @@ -141,26 +143,35 @@ pub struct RootActorMesh<'a, A: RemoteActor> {
proc_mesh: ProcMeshRef<'a>,
name: String,
pub(crate) ranks: Vec<ActorRef<A>>, // temporary until we remove `ArcActorMesh`.
actor_supervision_rx: mpsc::Receiver<ActorSupervisionEvent>,
}

impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
pub(crate) fn new(proc_mesh: &'a ProcMesh, name: String, ranks: Vec<ActorRef<A>>) -> Self {
pub(crate) fn new(
proc_mesh: &'a ProcMesh,
name: String,
actor_supervision_rx: mpsc::Receiver<ActorSupervisionEvent>,
ranks: Vec<ActorRef<A>>,
) -> Self {
Self {
proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
name,
ranks,
actor_supervision_rx,
}
}

pub(crate) fn new_shared(
proc_mesh: Arc<ProcMesh>,
name: String,
actor_supervision_rx: mpsc::Receiver<ActorSupervisionEvent>,
ranks: Vec<ActorRef<A>>,
) -> Self {
Self {
proc_mesh: ProcMeshRef::Shared(proc_mesh),
name,
ranks,
actor_supervision_rx,
}
}

Expand Down Expand Up @@ -198,6 +209,21 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
}
Ok(())
}

/// An event stream of proc events. Each ProcMesh can produce only one such
/// stream, returning None after the first call.
pub async fn next(&mut self) -> Option<ActorSupervisionEvent> {
let result = self.actor_supervision_rx.recv().await;
match result.as_ref() {
Some(event) => {
tracing::info!("Received supervision event: {event:?}");
}
None => {
tracing::info!("Closed!");
}
};
result
}
}

#[async_trait]
Expand Down
62 changes: 58 additions & 4 deletions hyperactor_mesh/src/proc_mesh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use hyperactor::supervision::ActorSupervisionEvent;
use ndslice::Range;
use ndslice::Shape;
use ndslice::ShapeError;
use tokio::sync::Mutex;
use tokio::sync::mpsc;

use crate::CommActor;
use crate::Mesh;
Expand Down Expand Up @@ -64,12 +66,14 @@ fn global_router() -> &'static MailboxRouter {
GLOBAL_ROUTER.get_or_init(MailboxRouter::new)
}

type ActorEventRouter = Arc<Mutex<HashMap<ActorMeshName, mpsc::Sender<ActorSupervisionEvent>>>>;
/// A ProcMesh maintains a mesh of procs whose lifecycles are managed by
/// an allocator.
pub struct ProcMesh {
// The underlying set of events. It is None if it has been transferred to
// a proc event observer.
event_state: Option<EventState>,
actor_event_router: ActorEventRouter,
shape: Shape,
ranks: Vec<(ProcId, (ChannelAddr, ActorRef<MeshAgent>))>,
#[allow(dead_code)] // will be used in subsequent diff
Expand Down Expand Up @@ -198,11 +202,13 @@ impl ProcMesh {
global_router().bind(alloc.world_id().clone().into(), router.clone());
global_router().bind(client_proc_id.into(), router.clone());

// TODO: No actor bound to "supervisor" yet.
let supervisor = client_proc.attach("supervisor")?;
let (supervison_port, supervision_events) = supervisor.open_port();

// Now, configure the full mesh, so that the local agents are wired up to
// our router.
// No actor bound to this "client" yet
let client = client_proc.attach("client")?;

// Map of procs -> channel addresses
Expand Down Expand Up @@ -277,6 +283,7 @@ impl ProcMesh {
alloc: Box::new(alloc),
supervision_events,
}),
actor_event_router: Arc::new(Mutex::new(HashMap::new())),
shape,
ranks: proc_ids
.into_iter()
Expand Down Expand Up @@ -359,11 +366,23 @@ impl ProcMesh {
where
A::Params: RemoteMessage,
{
Ok(RootActorMesh::new(
let actor_supervision_buffer_len =
hyperactor::config::global::get(hyperactor::config::MAX_SUPERVISION_EVENTS);
let (tx, rx) = mpsc::channel::<ActorSupervisionEvent>(actor_supervision_buffer_len);
{
// Instantiate supervision routing BEFORE spawning the actor mesh.
self.actor_event_router
.lock()
.await
.insert(actor_name.to_string(), tx);
}
let root_mesh = RootActorMesh::new(
self,
actor_name.to_string(),
rx,
Self::spawn_on_procs::<A>(&self.client, self.agents(), actor_name, params).await?,
))
);
Ok(root_mesh)
}

/// A client used to communicate with any member of this mesh.
Expand All @@ -390,8 +409,10 @@ impl ProcMesh {
.enumerate()
.map(|(rank, (proc_id, _))| (proc_id.clone(), rank))
.collect(),
actor_event_router: self.actor_event_router.clone(),
})
}

pub fn shape(&self) -> &Shape {
&self.shape
}
Expand Down Expand Up @@ -420,11 +441,14 @@ impl fmt::Display for ProcEvent {
}
}

type ActorMeshName = String;

/// An event stream of [`ProcEvent`]
// TODO: consider using streams for this.
pub struct ProcEvents {
event_state: EventState,
ranks: HashMap<ProcId, usize>,
actor_event_router: ActorEventRouter,
}

impl ProcEvents {
Expand All @@ -436,6 +460,11 @@ impl ProcEvents {
result = self.event_state.alloc.next() => {
// Don't disable the outer branch on None: this is always terminal.
let Some(alloc_event) = result else {
{
let mut map = self.actor_event_router.lock().await;
// Remove all values in map
map.clear();
}
break None;
};

Expand All @@ -452,11 +481,22 @@ impl ProcEvents {
break Some(ProcEvent::Stopped(*rank, reason));
}
Ok(event) = self.event_state.supervision_events.recv() => {
let (actor_id, actor_status) = event.into_inner();
let (actor_id, actor_status) = event.clone().into_inner();
let Some(rank) = self.ranks.get(actor_id.proc_id()) else {
tracing::warn!("received supervision event for unmapped actor {}", actor_id);
continue;
};
// transmit to the correct root actor mesh.
{
let map = self.actor_event_router.lock().await;
let Some(tx) = map.get(actor_id.name()) else {
tracing::warn!("received supervision event for unregistered actor {}", actor_id);
continue;
};
tx.send(event).await.unwrap();
}
// TODO: Actor supervision events need to be wired to the frontend.
// TODO: This event should be handled by the proc mesh if unhandled by actor mesh.
break Some(ProcEvent::Crashed(*rank, actor_status.to_string()))
}
}
Expand Down Expand Up @@ -487,9 +527,20 @@ impl SharedSpawnable for Arc<ProcMesh> {
where
A::Params: RemoteMessage,
{
let actor_supervision_buffer =
hyperactor::config::global::get(hyperactor::config::MAX_SUPERVISION_EVENTS);
let (tx, rx) = mpsc::channel::<ActorSupervisionEvent>(actor_supervision_buffer);
{
// Instantiate supervision routing BEFORE spawning the actor mesh.
self.actor_event_router
.lock()
.await
.insert(actor_name.to_string(), tx);
}
Ok(RootActorMesh::new_shared(
Arc::clone(self),
actor_name.to_string(),
rx,
ProcMesh::spawn_on_procs::<A>(&self.client, self.agents(), actor_name, params).await?,
))
}
Expand Down Expand Up @@ -639,7 +690,7 @@ mod tests {
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
let mut events = mesh.events().unwrap();

let actors = mesh.spawn::<TestActor>("failing", &()).await.unwrap();
let mut actors = mesh.spawn::<TestActor>("failing", &()).await.unwrap();

actors
.cast(
Expand All @@ -653,6 +704,8 @@ mod tests {
ProcEvent::Crashed(0, reason) if reason.contains("failmonkey")
);

assert_matches!(actors.next().await, Some(_));

stop();
assert_matches!(
events.next().await.unwrap(),
Expand All @@ -664,5 +717,6 @@ mod tests {
);

assert!(events.next().await.is_none());
assert!(actors.next().await.is_none());
}
}