Skip to content

Commit 92255ba

Browse files
kaiyuan-lifacebook-github-bot
authored andcommitted
create StateActor to store and serve logs for hyperactor (#211)
Summary: Pull Request resolved: #211 This is an initial commit for the state actor. This diff creates a circular buffer to store latest log messages. If the circular buffer is full, messages will be discarded with a FIFO rule. Next: 1. register the StateActor with actors in the proc_mesh so logs can be streamed in. 2. stream the logs out to loggregator so we can see the logs in `hyper log`. Differential Revision: D76343950
1 parent d445461 commit 92255ba

File tree

5 files changed

+462
-0
lines changed

5 files changed

+462
-0
lines changed

hyperactor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ mod parse;
8484
pub mod proc;
8585
pub mod reference;
8686
pub mod simnet;
87+
mod state;
8788
pub mod supervision;
8889
pub mod sync;
8990
/// Test utilities

hyperactor/src/state/client.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use anyhow::Result;
10+
use async_trait::async_trait;
11+
use hyperactor_macros::HandleClient;
12+
use hyperactor_macros::RefClient;
13+
use serde::Deserialize;
14+
use serde::Serialize;
15+
use tokio::sync::mpsc::Sender;
16+
17+
use crate as hyperactor;
18+
use crate::Actor;
19+
use crate::Handler;
20+
use crate::Instance;
21+
use crate::Named;
22+
use crate::state::object::GenericStateObject;
23+
24+
/// A client to interact with the state actor.
25+
#[derive(Debug)]
26+
#[hyperactor::export(ClientMessage)]
27+
pub struct ClientActor {
28+
sender: Sender<GenericStateObject>,
29+
}
30+
31+
/// Endpoints for the client actor.
32+
#[derive(Handler, HandleClient, RefClient, Debug, Serialize, Deserialize, Named)]
33+
pub enum ClientMessage {
34+
/// Push a batch of logs to the logs buffer
35+
Logs {
36+
/// Object to set.
37+
logs: Vec<GenericStateObject>,
38+
},
39+
}
40+
41+
pub struct ClientActorParams {
42+
pub sender: Sender<GenericStateObject>,
43+
}
44+
45+
#[async_trait]
46+
impl Actor for ClientActor {
47+
type Params = ClientActorParams;
48+
49+
async fn new(ClientActorParams { sender }: ClientActorParams) -> Result<Self, anyhow::Error> {
50+
Ok(Self { sender })
51+
}
52+
}
53+
54+
#[async_trait]
55+
#[hyperactor::forward(ClientMessage)]
56+
impl ClientMessageHandler for ClientActor {
57+
async fn logs(
58+
&mut self,
59+
_this: &Instance<Self>,
60+
logs: Vec<GenericStateObject>,
61+
) -> Result<(), anyhow::Error> {
62+
for log in logs {
63+
self.sender.send(log).await?;
64+
}
65+
Ok(())
66+
}
67+
}
68+
69+
#[cfg(test)]
70+
mod tests {
71+
use std::time::Duration;
72+
73+
use super::*;
74+
use crate::channel;
75+
use crate::channel::ChannelAddr;
76+
use crate::id;
77+
use crate::mailbox::BoxedMailboxSender;
78+
use crate::mailbox::MailboxClient;
79+
use crate::proc::Proc;
80+
use crate::state::spawn_actor;
81+
use crate::state::test_utils::log_items;
82+
83+
#[tokio::test]
84+
async fn test_client_basics() {
85+
let client_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
86+
let (sender, mut receiver) = tokio::sync::mpsc::channel::<GenericStateObject>(10);
87+
let params = ClientActorParams { sender };
88+
let (client_actor_addr, client_actor_ref) = spawn_actor::<ClientActor>(
89+
client_actor_addr.clone(),
90+
id![state_client[0].state_client],
91+
params,
92+
)
93+
.await
94+
.unwrap();
95+
96+
let remote_sender = MailboxClient::new(channel::dial(client_actor_addr.clone()).unwrap());
97+
let client_proc_id = id!(client).random_user_proc();
98+
let client_proc = Proc::new(
99+
client_proc_id.clone(),
100+
BoxedMailboxSender::new(remote_sender),
101+
);
102+
let remote_client = client_proc.attach("client").unwrap();
103+
104+
let log_items_0_10 = log_items(0, 10);
105+
client_actor_ref
106+
.logs(&remote_client, log_items_0_10.clone())
107+
.await
108+
.unwrap();
109+
110+
// Collect received messages with timeout
111+
let mut fetched_logs = vec![];
112+
for _ in 0..10 {
113+
// Timeout prevents hanging if a message is missing
114+
let log = tokio::time::timeout(Duration::from_secs(1), receiver.recv())
115+
.await
116+
.expect("timed out waiting for message")
117+
.expect("channel closed unexpectedly");
118+
119+
fetched_logs.push(log);
120+
}
121+
122+
// Verify we received all expected logs
123+
assert_eq!(fetched_logs.len(), 10);
124+
assert_eq!(fetched_logs, log_items_0_10);
125+
126+
// Now test that no extra message is waiting
127+
let extra = tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await;
128+
assert!(extra.is_err(), "expected no more messages");
129+
}
130+
}

hyperactor/src/state/mod.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use anyhow::Result;
10+
11+
use crate::Actor;
12+
use crate::ActorHandle;
13+
use crate::ActorId;
14+
use crate::ActorRef;
15+
use crate::actor::Binds;
16+
use crate::actor::RemoteActor;
17+
use crate::channel;
18+
use crate::channel::ChannelAddr;
19+
use crate::mailbox::BoxedMailboxSender;
20+
use crate::mailbox::DialMailboxRouter;
21+
use crate::mailbox::MailboxServer;
22+
use crate::mailbox::MessageEnvelope;
23+
use crate::mailbox::Undeliverable;
24+
use crate::proc::Proc;
25+
26+
mod client;
27+
mod object;
28+
mod state_actor;
29+
30+
/// Creates a state actor server at given address. Returns the server address and a handle to the
31+
/// state actor.
32+
#[allow(dead_code)]
33+
pub(crate) async fn spawn_actor<T: Actor + RemoteActor + Binds<T>>(
34+
addr: ChannelAddr,
35+
actor_id: ActorId,
36+
params: T::Params,
37+
) -> Result<(ChannelAddr, ActorRef<T>)> {
38+
let proc_id = actor_id.proc_id();
39+
let proc = Proc::new(
40+
proc_id.clone(),
41+
BoxedMailboxSender::new(DialMailboxRouter::new()),
42+
);
43+
let (local_addr, rx) = channel::serve(addr.clone()).await?;
44+
let actor_handle: ActorHandle<T> = proc.spawn(actor_id.name(), params).await?;
45+
actor_handle.bind::<T>();
46+
47+
// Undeliverable messages encountered by the mailbox server
48+
// are to be returned to the system actor.
49+
let return_handle = actor_handle.port::<Undeliverable<MessageEnvelope>>();
50+
let _mailbox_handle = proc.clone().serve(rx, return_handle);
51+
52+
Ok((local_addr, actor_handle.bind()))
53+
}
54+
55+
#[cfg(test)]
56+
pub(crate) mod test_utils {
57+
use crate::state::object::GenericStateObject;
58+
use crate::state::object::LogSpec;
59+
use crate::state::object::LogStatus;
60+
use crate::state::object::StateMetadata;
61+
use crate::state::object::StateObject;
62+
63+
pub(crate) fn log_items(seq_low: usize, seq_high: usize) -> Vec<GenericStateObject> {
64+
let mut log_items = vec![];
65+
let metadata = StateMetadata {
66+
name: "test".to_string(),
67+
kind: "log".to_string(),
68+
};
69+
let spec = LogSpec {};
70+
for seq in seq_low..seq_high {
71+
let status = LogStatus::new(seq, format!("status {}", seq));
72+
let state_object =
73+
StateObject::<LogSpec, LogStatus>::new(metadata.clone(), spec.clone(), status);
74+
let generic_state_object = GenericStateObject::from(state_object);
75+
log_items.push(generic_state_object);
76+
}
77+
log_items
78+
}
79+
}

hyperactor/src/state/object.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use serde::Deserialize;
10+
use serde::Serialize;
11+
12+
use crate as hyperactor;
13+
use crate::Named;
14+
15+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
16+
pub struct StateMetadata {
17+
/// Name of the actor.
18+
pub name: String,
19+
/// Kind of the object.
20+
pub kind: String,
21+
}
22+
23+
#[derive(Debug, Serialize, Deserialize)]
24+
pub struct StateObject<S, T> {
25+
metadata: StateMetadata,
26+
spec: S,
27+
status: T,
28+
}
29+
30+
impl<S, T> StateObject<S, T> {
31+
#[allow(dead_code)]
32+
pub fn new(metadata: StateMetadata, spec: S, status: T) -> Self {
33+
Self {
34+
metadata,
35+
spec,
36+
status,
37+
}
38+
}
39+
}
40+
41+
#[derive(Clone, Debug, Serialize, Deserialize)]
42+
pub struct LogSpec;
43+
44+
#[allow(dead_code)]
45+
#[derive(Debug, Serialize, Deserialize)]
46+
pub struct LogStatus {
47+
/// A monotonically increasing sequence number.
48+
seq: usize,
49+
/// The message in the log.
50+
message: String,
51+
}
52+
53+
impl LogStatus {
54+
#[allow(dead_code)]
55+
pub fn new(seq: usize, message: String) -> Self {
56+
Self { seq, message }
57+
}
58+
}
59+
60+
/// A generic state object which is the partially serialized version of a
61+
/// [`StateObject`]. Since [`StateObject`] takes generic types, those type information
62+
/// can be retrieved from the metadata to deserialize [`GenericStateObject`] into
63+
/// a [`StateObject<S, T>`].
64+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Named)]
65+
pub struct GenericStateObject {
66+
metadata: StateMetadata,
67+
data: String,
68+
}
69+
70+
impl<S, T> From<StateObject<S, T>> for GenericStateObject
71+
where
72+
S: Spec,
73+
T: Status,
74+
{
75+
fn from(value: StateObject<S, T>) -> Self {
76+
Self {
77+
metadata: value.metadata.clone(),
78+
data: serde_json::to_string(&value).unwrap(),
79+
}
80+
}
81+
}
82+
83+
pub trait Spec: Serialize + for<'de> Deserialize<'de> {}
84+
85+
pub trait Status: Serialize + for<'de> Deserialize<'de> {}
86+
87+
impl Spec for LogSpec {}
88+
impl Status for LogStatus {}

0 commit comments

Comments
 (0)