Skip to content

Commit 0f8af9e

Browse files
pzhan9facebook-github-bot
authored andcommitted
Change accumulator releated signatures (#216)
Summary: Pull Request resolved: #216 This diff is a prep diff for D76156297. Specifically, it does the following: 1. Add a `struct ReducerSpec` to replace the `64` reducer typehash. The main purpose of this struct is to add a new `builder_params` field, so we can argument the accumulator, with it, serialize it, and then send to comm actor, so comm actor can instantiate the reducer with this info. 2. Make some methods in `trait Accumulator` and `trait CommReducer` fallible. This is required by the python accumulator, because it does pickling and thus fallible. 3. Add a `typehash_f` field to `ReducerFactory`, which can be used to calculate the typehash directly. Reviewed By: mariusae Differential Revision: D76357417
1 parent 3956d95 commit 0f8af9e

File tree

10 files changed

+224
-111
lines changed

10 files changed

+224
-111
lines changed

hyperactor/src/accum.rs

Lines changed: 135 additions & 54 deletions
Large diffs are not rendered by default.

hyperactor/src/cap.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub(crate) mod sealed {
3333
use async_trait::async_trait;
3434

3535
use crate::PortId;
36+
use crate::accum::ReducerSpec;
3637
use crate::actor::Actor;
3738
use crate::actor::ActorHandle;
3839
use crate::data::Serialized;
@@ -47,7 +48,11 @@ pub(crate) mod sealed {
4748
}
4849

4950
pub trait CanSplitPort: Send + Sync {
50-
fn split(&self, port_id: PortId, reducer: Option<u64>) -> PortId;
51+
fn split(
52+
&self,
53+
port_id: PortId,
54+
reducer_spec: Option<ReducerSpec>,
55+
) -> anyhow::Result<PortId>;
5156
}
5257

5358
#[async_trait]

hyperactor/src/mailbox/mod.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ use crate::OncePortRef;
102102
use crate::PortRef;
103103
use crate::accum;
104104
use crate::accum::Accumulator;
105+
use crate::accum::ReducerSpec;
105106
use crate::actor::Signal;
106107
use crate::actor::remote::USER_PORT_OFFSET;
107108
use crate::cap;
@@ -1016,10 +1017,10 @@ impl Mailbox {
10161017
let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
10171018
let port_id = PortId(self.state.actor_id.clone(), port_index);
10181019
let state = Mutex::new(A::State::default());
1019-
let reducer_typehash = accum.reducer_typehash();
1020+
let reducer_spec = accum.reducer_spec();
10201021
let enqueue = move |update: A::Update| {
10211022
let mut state = state.lock().unwrap();
1022-
accum.accumulate(&mut state, update);
1023+
accum.accumulate(&mut state, update)?;
10231024
let _ = sender.send(state.clone());
10241025
Ok(())
10251026
};
@@ -1029,7 +1030,7 @@ impl Mailbox {
10291030
port_index,
10301031
sender: UnboundedPortSender::Func(Arc::new(enqueue)),
10311032
bound: Arc::new(OnceLock::new()),
1032-
reducer_typehash: Some(reducer_typehash),
1033+
reducer_spec,
10331034
},
10341035
PortReceiver::new(
10351036
receiver,
@@ -1052,7 +1053,7 @@ impl Mailbox {
10521053
port_index: self.state.allocate_port(),
10531054
sender: UnboundedPortSender::Func(Arc::new(enqueue)),
10541055
bound: Arc::new(OnceLock::new()),
1055-
reducer_typehash: None,
1056+
reducer_spec: None,
10561057
}
10571058
}
10581059

@@ -1240,7 +1241,7 @@ impl SplitPortBuffer {
12401241
}
12411242

12421243
impl cap::sealed::CanSplitPort for Mailbox {
1243-
fn split(&self, port_id: PortId, reducer_typehash: Option<u64>) -> PortId {
1244+
fn split(&self, port_id: PortId, reducer_spec: Option<ReducerSpec>) -> anyhow::Result<PortId> {
12441245
fn post(mailbox: &Mailbox, port_id: PortId, msg: Serialized) {
12451246
mailbox.post(
12461247
MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg),
@@ -1255,7 +1256,15 @@ impl cap::sealed::CanSplitPort for Mailbox {
12551256
let port_index = self.state.allocate_port();
12561257
let split_port = self.actor_id().port_id(port_index);
12571258
let mailbox = self.clone();
1258-
let reducer = reducer_typehash.and_then(accum::resolve_reducer);
1259+
let reducer = reducer_spec
1260+
.map(
1261+
|ReducerSpec {
1262+
typehash,
1263+
builder_params,
1264+
}| { accum::resolve_reducer(typehash, builder_params) },
1265+
)
1266+
.transpose()?
1267+
.flatten();
12591268
let enqueue: Box<
12601269
dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
12611270
> = match reducer {
@@ -1291,7 +1300,7 @@ impl cap::sealed::CanSplitPort for Mailbox {
12911300
port_id: split_port.clone(),
12921301
},
12931302
);
1294-
split_port
1303+
Ok(split_port)
12951304
}
12961305
}
12971306

@@ -1317,7 +1326,7 @@ pub struct PortHandle<M: Message> {
13171326
bound: Arc<OnceLock<PortId>>,
13181327
// Typehash of an optional reducer. When it's defined, we include it in port
13191328
/// references to optionally enable incremental accumulation.
1320-
reducer_typehash: Option<u64>,
1329+
reducer_spec: Option<ReducerSpec>,
13211330
}
13221331

13231332
impl<M: Message> PortHandle<M> {
@@ -1327,7 +1336,7 @@ impl<M: Message> PortHandle<M> {
13271336
port_index,
13281337
sender,
13291338
bound: Arc::new(OnceLock::new()),
1330-
reducer_typehash: None,
1339+
reducer_spec: None,
13311340
}
13321341
}
13331342

@@ -1357,7 +1366,7 @@ impl<M: RemoteMessage> PortHandle<M> {
13571366
self.bound
13581367
.get_or_init(|| self.mailbox.bind(self).port_id().clone())
13591368
.clone(),
1360-
self.reducer_typehash.clone(),
1369+
self.reducer_spec.clone(),
13611370
)
13621371
}
13631372

@@ -1375,7 +1384,7 @@ impl<M: Message> Clone for PortHandle<M> {
13751384
port_index: self.port_index,
13761385
sender: self.sender.clone(),
13771386
bound: self.bound.clone(),
1378-
reducer_typehash: self.reducer_typehash.clone(),
1387+
reducer_spec: self.reducer_spec.clone(),
13791388
}
13801389
}
13811390
}
@@ -2253,18 +2262,18 @@ mod tests {
22532262
// accum port could have reducer typehash
22542263
{
22552264
let accumulator = accum::max::<u64>();
2256-
let reducer_typehash = accumulator.reducer_typehash();
2265+
let reducer_spec = accumulator.reducer_spec().unwrap();
22572266
let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2258-
assert_eq!(port.reducer_typehash, Some(reducer_typehash),);
2267+
assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
22592268
let port_ref = port.bind();
2260-
assert_eq!(port_ref.reducer_typehash(), &Some(reducer_typehash));
2269+
assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
22612270
}
22622271
// normal port should not have reducer typehash
22632272
{
22642273
let (port, _) = mbox.open_port::<u64>();
2265-
assert_eq!(port.reducer_typehash, None);
2274+
assert_eq!(port.reducer_spec, None);
22662275
let port_ref = port.bind();
2267-
assert_eq!(port_ref.reducer_typehash(), &None);
2276+
assert_eq!(port_ref.reducer_spec(), &None);
22682277
}
22692278
}
22702279

@@ -2822,7 +2831,7 @@ mod tests {
28222831
port_id2_1: PortId,
28232832
}
28242833

2825-
async fn setup_split_port_ids(reducer_typehash: Option<u64>) -> Setup {
2834+
async fn setup_split_port_ids(reducer_spec: Option<ReducerSpec>) -> Setup {
28262835
let muxer = MailboxMuxer::new();
28272836
let actor0 = Mailbox::new(id!(test[0].actor), BoxedMailboxSender::new(muxer.clone()));
28282837
let actor1 = Mailbox::new(id!(test[1].actor1), BoxedMailboxSender::new(muxer.clone()));
@@ -2834,11 +2843,11 @@ mod tests {
28342843
let port_id = port_handle.bind().port_id().clone();
28352844

28362845
// Split it twice on actor1
2837-
let port_id1 = port_id.split(&actor1, reducer_typehash.clone());
2838-
let port_id2 = port_id.split(&actor1, reducer_typehash.clone());
2846+
let port_id1 = port_id.split(&actor1, reducer_spec.clone()).unwrap();
2847+
let port_id2 = port_id.split(&actor1, reducer_spec.clone()).unwrap();
28392848

28402849
// A split port id can also be split
2841-
let port_id2_1 = port_id2.split(&actor1, reducer_typehash);
2850+
let port_id2_1 = port_id2.split(&actor1, reducer_spec).unwrap();
28422851

28432852
Setup {
28442853
receiver,
@@ -2911,7 +2920,7 @@ mod tests {
29112920
let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
29122921

29132922
let sum_accumulator = accum::sum::<u64>();
2914-
let reducer_typehash = sum_accumulator.reducer_typehash();
2923+
let reducer_spec = sum_accumulator.reducer_spec();
29152924
let Setup {
29162925
mut receiver,
29172926
actor0,
@@ -2920,7 +2929,7 @@ mod tests {
29202929
port_id1,
29212930
port_id2,
29222931
port_id2_1,
2923-
} = setup_split_port_ids(Some(reducer_typehash)).await;
2932+
} = setup_split_port_ids(reducer_spec).await;
29242933
post(&actor0, port_id.clone(), 4);
29252934
post(&actor1, port_id1.clone(), 2);
29262935
post(&actor1, port_id2.clone(), 3);
@@ -2948,8 +2957,8 @@ mod tests {
29482957
let (port_handle, mut receiver) = actor.open_port::<u64>();
29492958
let port_id = port_handle.bind().port_id().clone();
29502959
// Split it
2951-
let reducer_typehash = accum::sum::<u64>().reducer_typehash();
2952-
let split_port_id = port_id.split(&actor, Some(reducer_typehash));
2960+
let reducer_spec = accum::sum::<u64>().reducer_spec();
2961+
let split_port_id = port_id.split(&actor, reducer_spec).unwrap();
29532962

29542963
// Send 9 messages.
29552964
for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {

hyperactor/src/message.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ mod tests {
313313
use hyperactor::id;
314314

315315
use super::*;
316+
use crate::accum::ReducerSpec;
316317
use crate::reference::UnboundPort;
317318

318319
// Used to demonstrate a user defined reply type.
@@ -349,7 +350,13 @@ mod tests {
349350
#[test]
350351
fn test_castable() {
351352
let original_port0 = PortRef::attest(id!(world[0].actor[0][123]));
352-
let original_port1 = PortRef::attest_reducible(id!(world[1].actor1[0][456]), Some(123));
353+
let original_port1 = PortRef::attest_reducible(
354+
id!(world[1].actor1[0][456]),
355+
Some(ReducerSpec {
356+
typehash: 123,
357+
builder_params: None,
358+
}),
359+
);
353360
let my_message = MyMessage {
354361
arg0: true,
355362
arg1: 42,
@@ -398,7 +405,13 @@ mod tests {
398405
.unwrap();
399406

400407
let new_port0 = PortRef::<String>::attest(new_port_id0);
401-
let new_port1 = PortRef::<MyReply>::attest_reducible(new_port_id1, Some(123));
408+
let new_port1 = PortRef::<MyReply>::attest_reducible(
409+
new_port_id1,
410+
Some(ReducerSpec {
411+
typehash: 123,
412+
builder_params: None,
413+
}),
414+
);
402415
let new_bindings = Bindings(
403416
[
404417
(

hyperactor/src/proc.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use crate::Handler;
5252
use crate::Message;
5353
use crate::Named;
5454
use crate::RemoteMessage;
55+
use crate::accum::ReducerSpec;
5556
use crate::actor::ActorError;
5657
use crate::actor::ActorErrorKind;
5758
use crate::actor::ActorHandle;
@@ -1198,8 +1199,8 @@ impl<A: Actor> cap::sealed::CanOpenPort for Instance<A> {
11981199
}
11991200

12001201
impl<A: Actor> cap::sealed::CanSplitPort for Instance<A> {
1201-
fn split(&self, port_id: PortId, reducer_typehash: Option<u64>) -> PortId {
1202-
self.mailbox.split(port_id, reducer_typehash)
1202+
fn split(&self, port_id: PortId, reducer_spec: Option<ReducerSpec>) -> anyhow::Result<PortId> {
1203+
self.mailbox.split(port_id, reducer_spec)
12031204
}
12041205
}
12051206

hyperactor/src/reference.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate as hyperactor;
4242
use crate::Named;
4343
use crate::RemoteHandles;
4444
use crate::RemoteMessage;
45+
use crate::accum::ReducerSpec;
4546
use crate::actor::RemoteActor;
4647
use crate::cap;
4748
use crate::data::Serialized;
@@ -728,8 +729,12 @@ impl PortId {
728729

729730
/// Split this port, returning a new port that relays messages to the port
730731
/// through a local proxy, which may coalesce messages.
731-
pub fn split(&self, caps: &impl cap::CanSplitPort, reducer_typehash: Option<u64>) -> PortId {
732-
caps.split(self.clone(), reducer_typehash)
732+
pub fn split(
733+
&self,
734+
caps: &impl cap::CanSplitPort,
735+
reducer_spec: Option<ReducerSpec>,
736+
) -> anyhow::Result<PortId> {
737+
caps.split(self.clone(), reducer_spec)
733738
}
734739
}
735740

@@ -763,7 +768,7 @@ pub struct PortRef<M: RemoteMessage> {
763768
Ord = "ignore",
764769
Hash = "ignore"
765770
)]
766-
reducer_typehash: Option<u64>,
771+
reducer_spec: Option<ReducerSpec>,
767772
phantom: PhantomData<M>,
768773
}
769774

@@ -773,17 +778,17 @@ impl<M: RemoteMessage> PortRef<M> {
773778
pub fn attest(port_id: PortId) -> Self {
774779
Self {
775780
port_id,
776-
reducer_typehash: None,
781+
reducer_spec: None,
777782
phantom: PhantomData,
778783
}
779784
}
780785

781786
/// The caller attests that the provided PortId can be
782787
/// converted to a reachable, typed port reference.
783-
pub(crate) fn attest_reducible(port_id: PortId, reducer_typehash: Option<u64>) -> Self {
788+
pub(crate) fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
784789
Self {
785790
port_id,
786-
reducer_typehash,
791+
reducer_spec,
787792
phantom: PhantomData,
788793
}
789794
}
@@ -796,8 +801,8 @@ impl<M: RemoteMessage> PortRef<M> {
796801

797802
/// The typehash of this port's reducer, if any. Reducers
798803
/// may be used to coalesce messages sent to a port.
799-
pub fn reducer_typehash(&self) -> &Option<u64> {
800-
&self.reducer_typehash
804+
pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
805+
&self.reducer_spec
801806
}
802807

803808
/// This port's ID.
@@ -846,7 +851,7 @@ impl<M: RemoteMessage> Clone for PortRef<M> {
846851
fn clone(&self) -> Self {
847852
Self {
848853
port_id: self.port_id.clone(),
849-
reducer_typehash: self.reducer_typehash.clone(),
854+
reducer_spec: self.reducer_spec.clone(),
850855
phantom: PhantomData,
851856
}
852857
}
@@ -866,7 +871,7 @@ impl<M: RemoteMessage> Named for PortRef<M> {
866871

867872
/// The parameters extracted from [`PortRef`] to [`Bindings`].
868873
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
869-
pub struct UnboundPort(pub PortId, pub Option<u64>);
874+
pub struct UnboundPort(pub PortId, pub Option<ReducerSpec>);
870875

871876
impl UnboundPort {
872877
/// Update the port id of this binding.
@@ -877,7 +882,7 @@ impl UnboundPort {
877882

878883
impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
879884
fn from(port_ref: &PortRef<M>) -> Self {
880-
UnboundPort(port_ref.port_id.clone(), port_ref.reducer_typehash.clone())
885+
UnboundPort(port_ref.port_id.clone(), port_ref.reducer_spec.clone())
881886
}
882887
}
883888

@@ -891,7 +896,7 @@ impl<M: RemoteMessage> Bind for PortRef<M> {
891896
fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
892897
let bound = bindings.try_pop_front::<UnboundPort>()?;
893898
self.port_id = bound.0;
894-
self.reducer_typehash = bound.1;
899+
self.reducer_spec = bound.1;
895900
Ok(())
896901
}
897902
}

hyperactor_mesh/examples/dining_philosophers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use hyperactor::Handler;
1818
use hyperactor::Instance;
1919
use hyperactor::Named;
2020
use hyperactor::PortRef;
21+
use hyperactor::accum::ReducerSpec;
2122
use hyperactor::message::Bind;
2223
use hyperactor::message::Bindings;
2324
use hyperactor::message::IndexedErasedUnbound;

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use hyperactor::Named;
1919
use hyperactor::PortHandle;
2020
use hyperactor::RemoteHandles;
2121
use hyperactor::RemoteMessage;
22+
use hyperactor::accum::ReducerSpec;
2223
use hyperactor::actor::RemoteActor;
2324
use hyperactor::mailbox::MailboxSenderError;
2425
use hyperactor::mailbox::PortReceiver;

0 commit comments

Comments
 (0)