Skip to content

Commit ecbb659

Browse files
kaiyuan-lifacebook-github-bot
authored andcommitted
create StateActor to store and serve logs for hyperactor (#211)
Summary: Pull Request resolved: #211 This is an initial commit for the state actor. This diff creates a circular buffer to store latest log messages. If the circular buffer is full, messages will be discarded with a FIFO rule. Next: 1. register the StateActor with actors in the proc_mesh so logs can be streamed in. 2. stream the logs out to loggregator so we can see the logs in `hyper log`. Differential Revision: D76343950
1 parent 45860ad commit ecbb659

File tree

4 files changed

+452
-0
lines changed

4 files changed

+452
-0
lines changed

hyperactor_state/src/client.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use anyhow::Result;
10+
use async_trait::async_trait;
11+
use hyperactor::Actor;
12+
use hyperactor::Handler;
13+
use hyperactor::Instance;
14+
use hyperactor::Named;
15+
use hyperactor_macros::HandleClient;
16+
use hyperactor_macros::RefClient;
17+
use serde::Deserialize;
18+
use serde::Serialize;
19+
use tokio::sync::mpsc::Sender;
20+
21+
use crate::object::GenericStateObject;
22+
23+
/// A client to interact with the state actor.
24+
#[derive(Debug)]
25+
#[hyperactor::export(ClientMessage)]
26+
pub struct ClientActor {
27+
sender: Sender<GenericStateObject>,
28+
}
29+
30+
/// Endpoints for the client actor.
31+
#[derive(Handler, HandleClient, RefClient, Debug, Serialize, Deserialize, Named)]
32+
pub enum ClientMessage {
33+
/// Push a batch of logs to the logs buffer
34+
Logs {
35+
/// Object to set.
36+
logs: Vec<GenericStateObject>,
37+
},
38+
}
39+
40+
pub struct ClientActorParams {
41+
pub sender: Sender<GenericStateObject>,
42+
}
43+
44+
#[async_trait]
45+
impl Actor for ClientActor {
46+
type Params = ClientActorParams;
47+
48+
async fn new(ClientActorParams { sender }: ClientActorParams) -> Result<Self, anyhow::Error> {
49+
Ok(Self { sender })
50+
}
51+
}
52+
53+
#[async_trait]
54+
#[hyperactor::forward(ClientMessage)]
55+
impl ClientMessageHandler for ClientActor {
56+
async fn logs(
57+
&mut self,
58+
_this: &Instance<Self>,
59+
logs: Vec<GenericStateObject>,
60+
) -> Result<(), anyhow::Error> {
61+
for log in logs {
62+
self.sender.send(log).await?;
63+
}
64+
Ok(())
65+
}
66+
}
67+
68+
#[cfg(test)]
69+
mod tests {
70+
use std::time::Duration;
71+
72+
use hyperactor::channel;
73+
use hyperactor::channel::ChannelAddr;
74+
use hyperactor::id;
75+
76+
use super::*;
77+
use crate::create_remote_client;
78+
use crate::spawn_actor;
79+
use crate::test_utils::log_items;
80+
81+
#[tokio::test]
82+
async fn test_client_basics() {
83+
let client_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
84+
let (sender, mut receiver) = tokio::sync::mpsc::channel::<GenericStateObject>(10);
85+
let params = ClientActorParams { sender };
86+
let (client_actor_addr, client_actor_ref) = spawn_actor::<ClientActor>(
87+
client_actor_addr.clone(),
88+
id![state_client[0].state_client],
89+
params,
90+
)
91+
.await
92+
.unwrap();
93+
94+
let (_client_proc, remote_client) = create_remote_client(client_actor_addr).await.unwrap();
95+
96+
let log_items_0_10 = log_items(0, 10);
97+
client_actor_ref
98+
.logs(&remote_client, log_items_0_10.clone())
99+
.await
100+
.unwrap();
101+
102+
// Collect received messages with timeout
103+
let mut fetched_logs = vec![];
104+
for _ in 0..10 {
105+
// Timeout prevents hanging if a message is missing
106+
let log = tokio::time::timeout(Duration::from_secs(1), receiver.recv())
107+
.await
108+
.expect("timed out waiting for message")
109+
.expect("channel closed unexpectedly");
110+
111+
fetched_logs.push(log);
112+
}
113+
114+
// Verify we received all expected logs
115+
assert_eq!(fetched_logs.len(), 10);
116+
assert_eq!(fetched_logs, log_items_0_10);
117+
118+
// Now test that no extra message is waiting
119+
let extra = tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await;
120+
assert!(extra.is_err(), "expected no more messages");
121+
}
122+
}

hyperactor_state/src/lib.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use anyhow::Result;
10+
use hyperactor::Actor;
11+
use hyperactor::ActorHandle;
12+
use hyperactor::ActorId;
13+
use hyperactor::ActorRef;
14+
use hyperactor::Mailbox;
15+
use hyperactor::actor::Binds;
16+
use hyperactor::actor::RemoteActor;
17+
use hyperactor::channel;
18+
use hyperactor::channel::ChannelAddr;
19+
use hyperactor::id;
20+
use hyperactor::mailbox::BoxedMailboxSender;
21+
use hyperactor::mailbox::DialMailboxRouter;
22+
use hyperactor::mailbox::MailboxClient;
23+
use hyperactor::mailbox::MailboxServer;
24+
use hyperactor::mailbox::MessageEnvelope;
25+
use hyperactor::mailbox::Undeliverable;
26+
use hyperactor::proc::Proc;
27+
28+
mod client;
29+
mod object;
30+
mod state_actor;
31+
32+
/// Creates a state actor server at given address. Returns the server address and a handle to the
33+
/// state actor.
34+
#[allow(dead_code)]
35+
pub(crate) async fn spawn_actor<T: Actor + RemoteActor + Binds<T>>(
36+
addr: ChannelAddr,
37+
actor_id: ActorId,
38+
params: T::Params,
39+
) -> Result<(ChannelAddr, ActorRef<T>)> {
40+
let proc_id = actor_id.proc_id();
41+
let proc = Proc::new(
42+
proc_id.clone(),
43+
BoxedMailboxSender::new(DialMailboxRouter::new()),
44+
);
45+
let (local_addr, rx) = channel::serve(addr.clone()).await?;
46+
let actor_handle: ActorHandle<T> = proc.spawn(actor_id.name(), params).await?;
47+
actor_handle.bind::<T>();
48+
49+
// Undeliverable messages encountered by the mailbox server
50+
// are to be returned to the system actor.
51+
let return_handle = actor_handle.port::<Undeliverable<MessageEnvelope>>();
52+
let _mailbox_handle = proc.clone().serve(rx, return_handle);
53+
54+
Ok((local_addr, actor_handle.bind()))
55+
}
56+
57+
/// Creates a remote client that can send message to actors in the remote addr.
58+
/// It is important to keep the client proc alive for the remote_client's lifetime.
59+
pub(crate) async fn create_remote_client(addr: ChannelAddr) -> Result<(Proc, Mailbox)> {
60+
let remote_sender = MailboxClient::new(channel::dial(addr).unwrap());
61+
let client_proc_id = id!(client).random_user_proc();
62+
let client_proc = Proc::new(
63+
client_proc_id.clone(),
64+
BoxedMailboxSender::new(remote_sender),
65+
);
66+
let remote_client = client_proc.attach("client").unwrap();
67+
Ok((client_proc, remote_client))
68+
}
69+
70+
#[cfg(test)]
71+
pub(crate) mod test_utils {
72+
use crate::object::GenericStateObject;
73+
use crate::object::LogSpec;
74+
use crate::object::LogStatus;
75+
use crate::object::StateMetadata;
76+
use crate::object::StateObject;
77+
78+
pub(crate) fn log_items(seq_low: usize, seq_high: usize) -> Vec<GenericStateObject> {
79+
let mut log_items = vec![];
80+
let metadata = StateMetadata {
81+
name: "test".to_string(),
82+
kind: "log".to_string(),
83+
};
84+
let spec = LogSpec {};
85+
for seq in seq_low..seq_high {
86+
let status = LogStatus::new(seq, format!("status {}", seq));
87+
let state_object =
88+
StateObject::<LogSpec, LogStatus>::new(metadata.clone(), spec.clone(), status);
89+
let generic_state_object = GenericStateObject::from(state_object);
90+
log_items.push(generic_state_object);
91+
}
92+
log_items
93+
}
94+
}

hyperactor_state/src/object.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use hyperactor::Named;
10+
use serde::Deserialize;
11+
use serde::Serialize;
12+
13+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
14+
pub struct StateMetadata {
15+
/// Name of the actor.
16+
pub name: String,
17+
/// Kind of the object.
18+
pub kind: String,
19+
}
20+
21+
#[derive(Debug, Serialize, Deserialize)]
22+
pub struct StateObject<S, T> {
23+
metadata: StateMetadata,
24+
spec: S,
25+
status: T,
26+
}
27+
28+
impl<S, T> StateObject<S, T> {
29+
#[allow(dead_code)]
30+
pub fn new(metadata: StateMetadata, spec: S, status: T) -> Self {
31+
Self {
32+
metadata,
33+
spec,
34+
status,
35+
}
36+
}
37+
}
38+
39+
#[derive(Clone, Debug, Serialize, Deserialize)]
40+
pub struct LogSpec;
41+
42+
#[allow(dead_code)]
43+
#[derive(Debug, Serialize, Deserialize)]
44+
pub struct LogStatus {
45+
/// A monotonically increasing sequence number.
46+
seq: usize,
47+
/// The message in the log.
48+
message: String,
49+
}
50+
51+
impl LogStatus {
52+
#[allow(dead_code)]
53+
pub fn new(seq: usize, message: String) -> Self {
54+
Self { seq, message }
55+
}
56+
}
57+
58+
/// A generic state object which is the partially serialized version of a
59+
/// [`StateObject`]. Since [`StateObject`] takes generic types, those type information
60+
/// can be retrieved from the metadata to deserialize [`GenericStateObject`] into
61+
/// a [`StateObject<S, T>`].
62+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Named)]
63+
pub struct GenericStateObject {
64+
metadata: StateMetadata,
65+
data: String,
66+
}
67+
68+
impl<S, T> From<StateObject<S, T>> for GenericStateObject
69+
where
70+
S: Spec,
71+
T: Status,
72+
{
73+
fn from(value: StateObject<S, T>) -> Self {
74+
Self {
75+
metadata: value.metadata.clone(),
76+
data: serde_json::to_string(&value).unwrap(),
77+
}
78+
}
79+
}
80+
81+
pub trait Spec: Serialize + for<'de> Deserialize<'de> {}
82+
83+
pub trait Status: Serialize + for<'de> Deserialize<'de> {}
84+
85+
impl Spec for LogSpec {}
86+
impl Status for LogStatus {}

0 commit comments

Comments
 (0)