Skip to content

Commit 46c8031

Browse files
author
Shayne Fletcher
committed
[hyperactor]: mailbox: CanSend for Mailbox use mailbox port for returns
Pull Request resolved: #303 the `CanSend` for `Mailbox` requires an undeliverable message port be self-bound. legacy tests were updated to be compliant with the new requirement. removing `monitored_return_handle()` from `CanSend` is a clear improvement. ghstack-source-id: 291502157 Differential Revision: [D76926674](https://our.internmc.facebook.com/intern/diff/D76926674/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D76926674/)!
1 parent 39e0783 commit 46c8031

File tree

4 files changed

+87
-12
lines changed

4 files changed

+87
-12
lines changed

controller/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ mod tests {
662662
let (worker, worker_ref, mut worker_rx) = proc
663663
.attach_actor::<WorkerActor, WorkerMessage>("worker")
664664
.unwrap();
665+
665666
IndexedErasedUnbound::<WorkerMessage>::bind_for_test_only(worker_ref.clone(), &worker)
666667
.unwrap();
667668

hyperactor/src/mailbox/mod.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,33 @@ impl Mailbox {
10821082
MailboxError::new(self.state.actor_id.clone(), err)
10831083
}
10841084

1085+
/// Look up the `UnboundedPortSender` for the port associated with
1086+
/// the message type `M`, as defined by `<M as Named>::port()`.
1087+
///
1088+
/// This performs a dynamic downcast to recover the original
1089+
/// sender type. Returns `None` if the port is not bound or if the
1090+
/// type does not match.
1091+
///
1092+
/// # Panics
1093+
/// Panics in debug mode if the port is bound but the stored
1094+
/// `port_id` does not match the expected one.
1095+
pub(crate) fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1096+
let port_index = M::port();
1097+
self.state.ports.get(&port_index).and_then(|boxed| {
1098+
boxed
1099+
.as_any()
1100+
.downcast_ref::<UnboundedSender<M>>()
1101+
.map(|s| {
1102+
debug_assert_eq!(
1103+
s.port_id,
1104+
self.actor_id().port_id(port_index),
1105+
"port_id mismatch in downcasted UnboundedSender"
1106+
);
1107+
s.sender.clone()
1108+
})
1109+
})
1110+
}
1111+
10851112
fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
10861113
assert_eq!(
10871114
handle.mailbox.actor_id(),
@@ -1210,8 +1237,22 @@ impl MailboxSender for Mailbox {
12101237

12111238
impl cap::sealed::CanSend for Mailbox {
12121239
fn post(&self, dest: PortId, data: Serialized) {
1240+
let return_handle = self
1241+
.lookup_sender::<Undeliverable<MessageEnvelope>>()
1242+
.map_or_else(
1243+
|| {
1244+
let bt = std::backtrace::Backtrace::capture();
1245+
tracing::warn!(
1246+
actor_id = ?self.actor_id(),
1247+
backtrace = ?bt,
1248+
"Mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
1249+
);
1250+
monitored_return_handle()
1251+
},
1252+
|sender| PortHandle::new(self.clone(), u64::MAX, sender),
1253+
);
12131254
let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data);
1214-
MailboxSender::post(self, envelope, monitored_return_handle());
1255+
MailboxSender::post(self, envelope, return_handle);
12151256
}
12161257
}
12171258

@@ -1592,6 +1633,8 @@ pub struct SerializedSenderError {
15921633
/// - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
15931634
/// validity of the underlying port.
15941635
trait SerializedSender: Send + Sync {
1636+
fn as_any(&self) -> &dyn Any;
1637+
15951638
/// Send a serialized message. SerializedSender will deserialize the
15961639
/// message (failing if it fails to deserialize), and then send the
15971640
/// resulting message on the underlying port.
@@ -1603,7 +1646,7 @@ trait SerializedSender: Send + Sync {
16031646
}
16041647

16051648
/// A sender to an M-typed unbounded port.
1606-
enum UnboundedPortSender<M: Message> {
1649+
pub(crate) enum UnboundedPortSender<M: Message> {
16071650
/// Send directly to the mpsc queue.
16081651
Mpsc(mpsc::UnboundedSender<M>),
16091652
/// Use the provided function to enqueue the item.
@@ -1675,6 +1718,10 @@ impl<M: Message> Clone for UnboundedSender<M> {
16751718
}
16761719

16771720
impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
1721+
fn as_any(&self) -> &dyn Any {
1722+
self
1723+
}
1724+
16781725
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
16791726
match serialized.deserialized() {
16801727
Ok(message) => {
@@ -1757,6 +1804,10 @@ impl<M: Message> Clone for OnceSender<M> {
17571804
}
17581805

17591806
impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
1807+
fn as_any(&self) -> &dyn Any {
1808+
self
1809+
}
1810+
17601811
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
17611812
match serialized.deserialized() {
17621813
Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
@@ -1781,6 +1832,10 @@ struct UntypedUnboundedSender {
17811832
}
17821833

17831834
impl SerializedSender for UntypedUnboundedSender {
1835+
fn as_any(&self) -> &dyn Any {
1836+
self
1837+
}
1838+
17841839
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
17851840
(self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
17861841
data,

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,6 @@ mod tests {
541541
use hyperactor::ProcId;
542542
use hyperactor::WorldId;
543543
use hyperactor::id;
544-
use hyperactor::mailbox::MessageEnvelope;
545544
use hyperactor::mailbox::Undeliverable;
546545
use hyperactor::message::Bind;
547546
use hyperactor::message::Unbind;
@@ -821,12 +820,13 @@ mod tests {
821820
.unwrap();
822821

823822
let name = alloc.name().to_string();
824-
let mesh = ProcMesh::allocate(alloc).await.unwrap();
825-
let unmonitored_reply_to = mesh.client().open_port::<usize>().0.bind();
826-
let (undeliverable_messages, mut undeliverable_rx) = mesh.client().open_port::<Undeliverable<MessageEnvelope>>();
827-
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
823+
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
824+
let mut undeliverable_rx = mesh
825+
.take_client_undeliverable_receiver()
826+
.expect("client_undeliverable_receiver should be available");
828827

829828
// Send a message to a non-existent actor (the proc however exists).
829+
let unmonitored_reply_to = mesh.client().open_port::<usize>().0.bind();
830830
let bad_actor = ActorRef::<TestActor>::attest(ActorId(ProcId(WorldId(name.clone()), 0), "foo".into(), 0));
831831
bad_actor.send(mesh.client(), GetRank(true, unmonitored_reply_to)).unwrap();
832832

@@ -870,10 +870,12 @@ mod tests {
870870
let monkey = alloc.chaos_monkey();
871871
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
872872
let mut events = mesh.events().unwrap();
873+
let mut undeliverable_msg_rx = mesh.take_client_undeliverable_receiver().unwrap();
873874

874-
let (undeliverable_msg_tx, mut undeliverable_msg_rx) = mesh.client().open_port();
875-
let ping_pong_actor_params =
876-
PingPongActorParams::new(undeliverable_msg_tx.bind(), None);
875+
let ping_pong_actor_params = PingPongActorParams::new(
876+
PortRef::attest_message_port(mesh.client().actor_id()),
877+
None,
878+
);
877879
let actor_mesh: RootActorMesh<PingPongActor> = mesh
878880
.spawn::<PingPongActor>("ping-pong", &ping_pong_actor_params)
879881
.await

hyperactor_mesh/src/proc_mesh/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use async_trait::async_trait;
1515
use hyperactor::Actor;
1616
use hyperactor::ActorRef;
1717
use hyperactor::Mailbox;
18+
use hyperactor::Named;
1819
use hyperactor::RemoteMessage;
1920
use hyperactor::WorldId;
2021
use hyperactor::actor::RemoteActor;
@@ -28,7 +29,9 @@ use hyperactor::mailbox::BoxedMailboxSender;
2829
use hyperactor::mailbox::DialMailboxRouter;
2930
use hyperactor::mailbox::MailboxRouter;
3031
use hyperactor::mailbox::MailboxServer;
32+
use hyperactor::mailbox::MessageEnvelope;
3133
use hyperactor::mailbox::PortReceiver;
34+
use hyperactor::mailbox::Undeliverable;
3235
use hyperactor::proc::Proc;
3336
use hyperactor::reference::ProcId;
3437
use hyperactor::reference::Reference;
@@ -75,6 +78,7 @@ pub struct ProcMesh {
7578
#[allow(dead_code)] // will be used in subsequent diff
7679
client_proc: Proc,
7780
client: Mailbox,
81+
client_undeliverable_receiver: Option<PortReceiver<Undeliverable<MessageEnvelope>>>,
7882
comm_actors: Vec<ActorRef<CommActor>>,
7983
}
8084

@@ -201,9 +205,13 @@ impl ProcMesh {
201205
let supervisor = client_proc.attach("supervisor")?;
202206
let (supervison_port, supervision_events) = supervisor.open_port();
203207

204-
// Now, configure the full mesh, so that the local agents are wired up to
205-
// our router.
208+
// Now, configure the full mesh, so that the local agents are
209+
// wired up to our router. Bind an undeliverable message port
210+
// in the client and return the port receiver.
206211
let client = client_proc.attach("client")?;
212+
let (undeliverable_messages, client_undeliverable_receiver) =
213+
client.open_port::<Undeliverable<MessageEnvelope>>();
214+
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
207215

208216
// Map of procs -> channel addresses
209217
let address_book: HashMap<_, _> = running
@@ -285,6 +293,7 @@ impl ProcMesh {
285293
.collect(),
286294
client_proc,
287295
client,
296+
client_undeliverable_receiver: Some(client_undeliverable_receiver),
288297
comm_actors,
289298
})
290299
}
@@ -371,6 +380,14 @@ impl ProcMesh {
371380
&self.client
372381
}
373382

383+
/// Used to get the `PortReceiver<Undeliverable<MessageEnvelope>>`
384+
/// bound in the client mailbox.
385+
pub fn take_client_undeliverable_receiver(
386+
&mut self,
387+
) -> Option<PortReceiver<Undeliverable<MessageEnvelope>>> {
388+
self.client_undeliverable_receiver.take()
389+
}
390+
374391
pub fn client_proc(&self) -> &Proc {
375392
&self.client_proc
376393
}

0 commit comments

Comments
 (0)