Skip to content

Commit d06c440

Browse files
author
Shayne Fletcher
committed
[hyperactor]: mailbox: CanSend for Mailbox use mailbox port for returns
Pull Request resolved: #303 the `CanSend` for `Mailbox` produces a self-bound `PortHandle` (allocating and binding only if needed). i don't have any immediate test coverage but removing `monitored_return_handle()` from this code is a definite and substantial improvement. ghstack-source-id: 291349747 Differential Revision: [D76926674](https://our.internmc.facebook.com/intern/diff/D76926674/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D76926674/)!
1 parent 39e0783 commit d06c440

File tree

1 file changed

+52
-2
lines changed

1 file changed

+52
-2
lines changed

hyperactor/src/mailbox/mod.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,33 @@ impl Mailbox {
10821082
MailboxError::new(self.state.actor_id.clone(), err)
10831083
}
10841084

1085+
/// Look up the `UnboundedPortSender` for the port associated with
1086+
/// the message type `M`, as defined by `<M as Named>::port()`.
1087+
///
1088+
/// This performs a dynamic downcast to recover the original
1089+
/// sender type. Returns `None` if the port is not bound or if the
1090+
/// type does not match.
1091+
///
1092+
/// # Panics
1093+
/// Panics in debug mode if the port is bound but the stored
1094+
/// `port_id` does not match the expected one.
1095+
pub(crate) fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1096+
let port_index = M::port();
1097+
self.state.ports.get(&port_index).and_then(|boxed| {
1098+
boxed
1099+
.as_any()
1100+
.downcast_ref::<UnboundedSender<M>>()
1101+
.map(|s| {
1102+
debug_assert_eq!(
1103+
s.port_id,
1104+
self.actor_id().port_id(port_index),
1105+
"port_id mismatch in downcasted UnboundedSender"
1106+
);
1107+
s.sender.clone()
1108+
})
1109+
})
1110+
}
1111+
10851112
fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
10861113
assert_eq!(
10871114
handle.mailbox.actor_id(),
@@ -1210,8 +1237,17 @@ impl MailboxSender for Mailbox {
12101237

12111238
impl cap::sealed::CanSend for Mailbox {
12121239
fn post(&self, dest: PortId, data: Serialized) {
1240+
let undeliverable_port_id = Undeliverable::<MessageEnvelope>::port();
1241+
let return_handle = match self.lookup_sender::<Undeliverable<MessageEnvelope>>() {
1242+
Some(sender) => PortHandle::new(self.clone(), undeliverable_port_id, sender),
1243+
None => {
1244+
let (handle, _) = self.open_port::<Undeliverable<MessageEnvelope>>();
1245+
handle.bind_to(undeliverable_port_id);
1246+
handle
1247+
}
1248+
};
12131249
let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data);
1214-
MailboxSender::post(self, envelope, monitored_return_handle());
1250+
MailboxSender::post(self, envelope, return_handle);
12151251
}
12161252
}
12171253

@@ -1592,6 +1628,8 @@ pub struct SerializedSenderError {
15921628
/// - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
15931629
/// validity of the underlying port.
15941630
trait SerializedSender: Send + Sync {
1631+
fn as_any(&self) -> &dyn Any;
1632+
15951633
/// Send a serialized message. SerializedSender will deserialize the
15961634
/// message (failing if it fails to deserialize), and then send the
15971635
/// resulting message on the underlying port.
@@ -1603,7 +1641,7 @@ trait SerializedSender: Send + Sync {
16031641
}
16041642

16051643
/// A sender to an M-typed unbounded port.
1606-
enum UnboundedPortSender<M: Message> {
1644+
pub(crate) enum UnboundedPortSender<M: Message> {
16071645
/// Send directly to the mpsc queue.
16081646
Mpsc(mpsc::UnboundedSender<M>),
16091647
/// Use the provided function to enqueue the item.
@@ -1675,6 +1713,10 @@ impl<M: Message> Clone for UnboundedSender<M> {
16751713
}
16761714

16771715
impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
1716+
fn as_any(&self) -> &dyn Any {
1717+
self
1718+
}
1719+
16781720
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
16791721
match serialized.deserialized() {
16801722
Ok(message) => {
@@ -1757,6 +1799,10 @@ impl<M: Message> Clone for OnceSender<M> {
17571799
}
17581800

17591801
impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
1802+
fn as_any(&self) -> &dyn Any {
1803+
self
1804+
}
1805+
17601806
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
17611807
match serialized.deserialized() {
17621808
Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
@@ -1781,6 +1827,10 @@ struct UntypedUnboundedSender {
17811827
}
17821828

17831829
impl SerializedSender for UntypedUnboundedSender {
1830+
fn as_any(&self) -> &dyn Any {
1831+
self
1832+
}
1833+
17841834
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
17851835
(self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
17861836
data,

0 commit comments

Comments
 (0)