Skip to content

Commit a24e3b6

Browse files
Thomas Wangfacebook-github-bot
authored andcommitted
Create separate events for sending & receiving messages (#250)
Summary: Pull Request resolved: #250 We have been treating the entire message send & recv as a single event but this doesn't really make sense since each of these should have their own duration (sending -> in flight -> receiving), and they happen on different processes (This will matter in the next diff when we render our events onto Perfetto). Differential Revision: D76433477
1 parent 8e37457 commit a24e3b6

File tree

2 files changed

+213
-108
lines changed

2 files changed

+213
-108
lines changed

hyperactor/src/channel/sim.rs

Lines changed: 110 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use crate::mailbox::MessageEnvelope;
2828
use crate::simnet;
2929
use crate::simnet::Dispatcher;
3030
use crate::simnet::Event;
31-
use crate::simnet::ScheduledEvent;
3231
use crate::simnet::SimNetConfig;
3332
use crate::simnet::SimNetEdge;
3433
use crate::simnet::SimNetError;
@@ -152,15 +151,16 @@ impl fmt::Display for SimAddr {
152151

153152
/// Message Event that can be passed around in the simnet.
154153
#[derive(Debug)]
155-
pub(crate) struct MessageDeliveryEvent {
154+
pub(crate) struct MessageSendEvent {
156155
src_addr: Option<AddressProxyPair>,
157156
dest_addr: AddressProxyPair,
158157
data: Serialized,
159158
duration_ms: u64,
159+
inflight_time_ms: u64,
160160
}
161161

162-
impl MessageDeliveryEvent {
163-
/// Creates a new MessageDeliveryEvent.
162+
impl MessageSendEvent {
163+
/// Creates a new MessageSendEvent.
164164
pub fn new(
165165
src_addr: Option<AddressProxyPair>,
166166
dest_addr: AddressProxyPair,
@@ -170,22 +170,32 @@ impl MessageDeliveryEvent {
170170
src_addr,
171171
dest_addr,
172172
data,
173-
duration_ms: 100,
173+
duration_ms: 1,
174+
inflight_time_ms: 1,
174175
}
175176
}
176177
}
177178

178179
#[async_trait]
179-
impl Event for MessageDeliveryEvent {
180+
impl Event for MessageSendEvent {
180181
async fn handle(&self) -> Result<(), SimNetError> {
181-
// Send the message to the correct receiver.
182-
SENDER
183-
.send(
184-
self.src_addr.clone(),
185-
self.dest_addr.clone(),
186-
self.data.clone(),
187-
)
188-
.await?;
182+
let inflight_time_ms = self.inflight_time_ms;
183+
let event = Box::new(MessageRecvEvent::new(
184+
self.src_addr.clone(),
185+
self.dest_addr.clone(),
186+
self.data.clone(),
187+
));
188+
189+
tokio::task::spawn(async move {
190+
SimClock
191+
.sleep(tokio::time::Duration::from_millis(inflight_time_ms))
192+
.await;
193+
194+
if let Ok(handle) = simnet_handle() {
195+
let _ = handle.send_event(event);
196+
}
197+
});
198+
189199
Ok(())
190200
}
191201

@@ -195,7 +205,7 @@ impl Event for MessageDeliveryEvent {
195205

196206
fn summary(&self) -> String {
197207
format!(
198-
"Sending message from {} to {}",
208+
"{} sending message to to {}",
199209
self.src_addr
200210
.as_ref()
201211
.map_or("unknown".to_string(), |addr| addr.address.to_string()),
@@ -209,7 +219,7 @@ impl Event for MessageDeliveryEvent {
209219
src: src_addr.address.clone(),
210220
dst: self.dest_addr.address.clone(),
211221
};
212-
self.duration_ms = topology
222+
self.inflight_time_ms = topology
213223
.lock()
214224
.await
215225
.topology
@@ -219,6 +229,60 @@ impl Event for MessageDeliveryEvent {
219229
}
220230
}
221231

232+
/// Message Recv Event that can be passed around in the simnet.
233+
#[derive(Debug)]
234+
pub(crate) struct MessageRecvEvent {
235+
src_addr: Option<AddressProxyPair>,
236+
dest_addr: AddressProxyPair,
237+
data: Serialized,
238+
duration_ms: u64,
239+
}
240+
241+
impl MessageRecvEvent {
242+
/// Creates a new MessageRecvEvent.
243+
pub fn new(
244+
src_addr: Option<AddressProxyPair>,
245+
dest_addr: AddressProxyPair,
246+
data: Serialized,
247+
) -> Self {
248+
Self {
249+
src_addr,
250+
dest_addr,
251+
data,
252+
duration_ms: 1,
253+
}
254+
}
255+
}
256+
257+
#[async_trait]
258+
impl Event for MessageRecvEvent {
259+
async fn handle(&self) -> Result<(), SimNetError> {
260+
// Send the message to the correct receiver.
261+
SENDER
262+
.send(
263+
self.src_addr.clone(),
264+
self.dest_addr.clone(),
265+
self.data.clone(),
266+
)
267+
.await?;
268+
Ok(())
269+
}
270+
271+
fn duration_ms(&self) -> u64 {
272+
self.duration_ms
273+
}
274+
275+
fn summary(&self) -> String {
276+
format!(
277+
"{} received message from {}",
278+
self.dest_addr.address.clone(),
279+
self.src_addr
280+
.as_ref()
281+
.map_or("unknown".to_string(), |addr| addr.address.to_string()),
282+
)
283+
}
284+
}
285+
222286
/// Bind a channel address to the simnet. It will register the address as a node in simnet,
223287
/// and configure default latencies between this node and all other existing nodes.
224288
pub async fn bind(addr: ChannelAddr) -> anyhow::Result<(), SimNetError> {
@@ -372,16 +436,24 @@ impl<M: RemoteMessage> Tx<M> for SimTx<M> {
372436
};
373437
match simnet_handle() {
374438
Ok(handle) => match &self.src_addr {
375-
Some(src_addr) if src_addr.proxy != *handle.proxy_addr() => handle
376-
.send_scheduled_event(ScheduledEvent {
377-
event: Box::new(MessageDeliveryEvent::new(
378-
self.src_addr.clone(),
379-
self.dst_addr.clone(),
380-
data,
381-
)),
382-
time: SimClock.millis_since_start(RealClock.now()),
383-
}),
384-
_ => handle.send_event(Box::new(MessageDeliveryEvent::new(
439+
Some(src_addr) if src_addr.proxy != *handle.proxy_addr() => {
440+
let event = Box::new(MessageRecvEvent::new(
441+
self.src_addr.clone(),
442+
self.dst_addr.clone(),
443+
data,
444+
));
445+
let recv_time = RealClock.now();
446+
447+
tokio::task::spawn(async move {
448+
SimClock.sleep_until(recv_time).await;
449+
450+
if let Ok(handle) = simnet_handle() {
451+
let _ = handle.send_event(event);
452+
}
453+
});
454+
Ok(())
455+
}
456+
_ => handle.send_event(Box::new(MessageSendEvent::new(
385457
self.src_addr.clone(),
386458
self.dst_addr.clone(),
387459
data,
@@ -602,8 +674,15 @@ mod tests {
602674
// Messages have not been receive since 10 seconds have not elapsed
603675
assert!(rx.rx.try_recv().is_err());
604676
}
605-
// Advance "real" time by 100 seconds
606-
tokio::time::advance(tokio::time::Duration::from_secs(100)).await;
677+
tokio::time::advance(
678+
// Advance "real" time by 1 ms for send time
679+
tokio::time::Duration::from_millis(1)
680+
// Advance "real" time by 100 seconds for inflight time
681+
+ tokio::time::Duration::from_secs(100)
682+
// Advance "real" time by 1 ms for recv time
683+
+ tokio::time::Duration::from_millis(1),
684+
)
685+
.await;
607686
{
608687
// Allow some time for simnet to run
609688
tokio::task::yield_now().await;
@@ -622,16 +701,6 @@ mod tests {
622701
1000,
623702
)
624703
.unwrap();
625-
let controller_to_dst = SimAddr::new_with_src(
626-
AddressProxyPair {
627-
address: "unix!@controller".parse::<ChannelAddr>().unwrap(),
628-
proxy: proxy_addr.clone(),
629-
},
630-
"unix!@dst".parse::<ChannelAddr>().unwrap(),
631-
proxy_addr.clone(),
632-
)
633-
.unwrap();
634-
let controller_tx = sim::dial::<()>(controller_to_dst.clone()).unwrap();
635704

636705
let client_to_dst = SimAddr::new_with_src(
637706
AddressProxyPair {
@@ -644,35 +713,18 @@ mod tests {
644713
.unwrap();
645714
let client_tx = sim::dial::<()>(client_to_dst).unwrap();
646715

647-
// 1 second of latency
648-
let simnet_config_yaml = r#"
649-
edges:
650-
- src: unix!@controller
651-
dst: unix!@dst
652-
metadata:
653-
latency: 1
654-
"#;
655-
update_config(NetworkConfig::from_yaml(simnet_config_yaml).unwrap())
656-
.await
657-
.unwrap();
658-
659716
assert_eq!(SimClock.millis_since_start(RealClock.now()), 0);
660717
// Fast forward real time to 5 seconds
661718
tokio::time::advance(tokio::time::Duration::from_secs(5)).await;
662719
{
663720
// Send client message
664721
client_tx.try_post((), oneshot::channel().0).unwrap();
665-
// Send system message
666-
controller_tx.try_post((), oneshot::channel().0).unwrap();
722+
tokio::time::advance(tokio::time::Duration::from_millis(1)).await;
667723
// Allow some time for simnet to run
668-
RealClock.sleep(tokio::time::Duration::from_secs(1)).await;
724+
tokio::task::yield_now().await;
669725
}
670726
let recs = simnet::simnet_handle().unwrap().close().await.unwrap();
671-
assert_eq!(recs.len(), 2);
672-
let end_times = recs.iter().map(|rec| rec.end_at).collect::<Vec<_>>();
673727
// client message was delivered at "real" time = 5 seconds
674-
assert!(end_times.contains(&5000));
675-
// system message was delivered at simulated time = 1 second
676-
assert!(end_times.contains(&1000));
728+
assert_eq!(recs.first().map(|rec| rec.end_at).unwrap(), 5000);
677729
}
678730
}

0 commit comments

Comments
 (0)