Skip to content

Commit f344d33

Browse files
kaiyuan-lifacebook-github-bot
authored andcommitted
create StateActor to stream logs for hyperactor (#211)
Summary: Pull Request resolved: #211 This is an initial commit for the state actor. This diff creates a circular buffer to store latest log messages. If the circular buffer is full, messages will be discarded with a FIFO rule. Next: 1. register the StateActor with actors in the proc_mesh so logs can be streamed in. 2. stream the logs out to loggregator so we can see the logs in `hyper log`. Differential Revision: D76343950
1 parent 57cf61c commit f344d33

File tree

5 files changed

+476
-0
lines changed

5 files changed

+476
-0
lines changed

hyperactor_state/Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# @generated by autocargo from //monarch/hyperactor_state:hyperactor_state
2+
3+
[package]
4+
name = "hyperactor_state"
5+
version = "0.0.0"
6+
authors = ["Meta"]
7+
edition = "2021"
8+
license = "BSD-3-Clause"
9+
10+
[dependencies]
11+
anyhow = "1.0.95"
12+
async-trait = "0.1.86"
13+
hyperactor = { version = "0.0.0", path = "../hyperactor" }
14+
hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" }
15+
serde = { version = "1.0.185", features = ["derive", "rc"] }
16+
serde_json = { version = "1.0.140", features = ["float_roundtrip", "unbounded_depth"] }
17+
tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] }
18+
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
19+
unicode-ident = "1.0.12"

hyperactor_state/src/client.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use anyhow::Result;
10+
use async_trait::async_trait;
11+
use hyperactor::Actor;
12+
use hyperactor::Handler;
13+
use hyperactor::Instance;
14+
use hyperactor::Named;
15+
use hyperactor_macros::HandleClient;
16+
use hyperactor_macros::RefClient;
17+
use serde::Deserialize;
18+
use serde::Serialize;
19+
use tokio::sync::mpsc::Sender;
20+
21+
use crate::object::GenericStateObject;
22+
23+
/// A client to interact with the state actor.
24+
#[derive(Debug)]
25+
#[hyperactor::export(ClientMessage)]
26+
pub struct ClientActor {
27+
sender: Sender<GenericStateObject>,
28+
}
29+
30+
/// Endpoints for the client actor.
31+
#[derive(Handler, HandleClient, RefClient, Debug, Serialize, Deserialize, Named)]
32+
pub enum ClientMessage {
33+
/// Push a batch of logs to the logs buffer
34+
Logs {
35+
/// Object to set.
36+
logs: Vec<GenericStateObject>,
37+
},
38+
}
39+
40+
pub struct ClientActorParams {
41+
pub sender: Sender<GenericStateObject>,
42+
}
43+
44+
#[async_trait]
45+
impl Actor for ClientActor {
46+
type Params = ClientActorParams;
47+
48+
async fn new(ClientActorParams { sender }: ClientActorParams) -> Result<Self, anyhow::Error> {
49+
Ok(Self { sender })
50+
}
51+
}
52+
53+
#[async_trait]
54+
#[hyperactor::forward(ClientMessage)]
55+
impl ClientMessageHandler for ClientActor {
56+
async fn logs(
57+
&mut self,
58+
_this: &Instance<Self>,
59+
logs: Vec<GenericStateObject>,
60+
) -> Result<(), anyhow::Error> {
61+
for log in logs {
62+
self.sender.send(log).await?;
63+
}
64+
Ok(())
65+
}
66+
}
67+
68+
#[cfg(test)]
69+
mod tests {
70+
use std::time::Duration;
71+
72+
use hyperactor::channel;
73+
use hyperactor::channel::ChannelAddr;
74+
use hyperactor::id;
75+
76+
use super::*;
77+
use crate::create_remote_client;
78+
use crate::spawn_actor;
79+
use crate::test_utils::log_items;
80+
81+
#[tracing_test::traced_test]
82+
#[tokio::test]
83+
async fn test_client_basics() {
84+
let client_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
85+
let (sender, mut receiver) = tokio::sync::mpsc::channel::<GenericStateObject>(10);
86+
let params = ClientActorParams { sender };
87+
let (client_actor_addr, client_actor_ref) = spawn_actor::<ClientActor>(
88+
client_actor_addr.clone(),
89+
id![state_client[0].state_client],
90+
params,
91+
)
92+
.await
93+
.unwrap();
94+
95+
let (_client_proc, remote_client) = create_remote_client(client_actor_addr).await.unwrap();
96+
97+
let log_items_0_10 = log_items(0, 10);
98+
client_actor_ref
99+
.logs(&remote_client, log_items_0_10.clone())
100+
.await
101+
.unwrap();
102+
103+
// Collect received messages with timeout
104+
let mut fetched_logs = vec![];
105+
for _ in 0..10 {
106+
// Timeout prevents hanging if a message is missing
107+
let log = tokio::time::timeout(Duration::from_secs(1), receiver.recv())
108+
.await
109+
.expect("timed out waiting for message")
110+
.expect("channel closed unexpectedly");
111+
112+
fetched_logs.push(log);
113+
}
114+
115+
// Verify we received all expected logs
116+
assert_eq!(fetched_logs.len(), 10);
117+
assert_eq!(fetched_logs, log_items_0_10);
118+
119+
// Now test that no extra message is waiting
120+
let extra = tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await;
121+
assert!(extra.is_err(), "expected no more messages");
122+
}
123+
}

hyperactor_state/src/lib.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use anyhow::Result;
10+
use hyperactor::Actor;
11+
use hyperactor::ActorHandle;
12+
use hyperactor::ActorId;
13+
use hyperactor::ActorRef;
14+
use hyperactor::Mailbox;
15+
use hyperactor::actor::Binds;
16+
use hyperactor::actor::RemoteActor;
17+
use hyperactor::channel;
18+
use hyperactor::channel::ChannelAddr;
19+
use hyperactor::id;
20+
use hyperactor::mailbox::BoxedMailboxSender;
21+
use hyperactor::mailbox::DialMailboxRouter;
22+
use hyperactor::mailbox::MailboxClient;
23+
use hyperactor::mailbox::MailboxServer;
24+
use hyperactor::mailbox::MessageEnvelope;
25+
use hyperactor::mailbox::Undeliverable;
26+
use hyperactor::proc::Proc;
27+
28+
mod client;
29+
mod object;
30+
mod state_actor;
31+
32+
/// Creates a state actor server at given address. Returns the server address and a handle to the
33+
/// state actor.
34+
#[allow(dead_code)]
35+
pub(crate) async fn spawn_actor<T: Actor + RemoteActor + Binds<T>>(
36+
addr: ChannelAddr,
37+
actor_id: ActorId,
38+
params: T::Params,
39+
) -> Result<(ChannelAddr, ActorRef<T>)> {
40+
let proc_id = actor_id.proc_id();
41+
let proc = Proc::new(
42+
proc_id.clone(),
43+
BoxedMailboxSender::new(DialMailboxRouter::new()),
44+
);
45+
let (local_addr, rx) = channel::serve(addr.clone()).await?;
46+
let actor_handle: ActorHandle<T> = proc.spawn(actor_id.name(), params).await?;
47+
actor_handle.bind::<T>();
48+
49+
// Undeliverable messages encountered by the mailbox server
50+
// are to be returned to the system actor.
51+
let return_handle = actor_handle.port::<Undeliverable<MessageEnvelope>>();
52+
let _mailbox_handle = proc.clone().serve(rx, return_handle);
53+
54+
Ok((local_addr, actor_handle.bind()))
55+
}
56+
57+
/// Creates a remote client that can send message to actors in the remote addr.
58+
/// It is important to keep the client proc alive for the remote_client's lifetime.
59+
pub(crate) async fn create_remote_client(addr: ChannelAddr) -> Result<(Proc, Mailbox)> {
60+
let remote_sender = MailboxClient::new(channel::dial(addr).unwrap());
61+
let client_proc_id = id!(client).random_user_proc();
62+
let client_proc = Proc::new(
63+
client_proc_id.clone(),
64+
BoxedMailboxSender::new(remote_sender),
65+
);
66+
let remote_client = client_proc.attach("client").unwrap();
67+
Ok((client_proc, remote_client))
68+
}
69+
70+
#[cfg(test)]
71+
pub(crate) mod test_utils {
72+
use crate::object::GenericStateObject;
73+
use crate::object::LogSpec;
74+
use crate::object::LogState;
75+
use crate::object::StateMetadata;
76+
use crate::object::StateObject;
77+
78+
pub(crate) fn log_items(seq_low: usize, seq_high: usize) -> Vec<GenericStateObject> {
79+
let mut log_items = vec![];
80+
let metadata = StateMetadata {
81+
name: "test".to_string(),
82+
kind: "log".to_string(),
83+
};
84+
let spec = LogSpec {};
85+
for seq in seq_low..seq_high {
86+
let state = LogState::new(seq, format!("state {}", seq));
87+
let state_object =
88+
StateObject::<LogSpec, LogState>::new(metadata.clone(), spec.clone(), state);
89+
let generic_state_object = GenericStateObject::try_from(state_object).unwrap();
90+
log_items.push(generic_state_object);
91+
}
92+
log_items
93+
}
94+
}

hyperactor_state/src/object.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use hyperactor::Named;
10+
use serde::Deserialize;
11+
use serde::Serialize;
12+
use serde_json::Value;
13+
14+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
15+
pub struct StateMetadata {
16+
/// Name of the state object owner.
17+
pub name: String,
18+
/// Kind of the object.
19+
pub kind: String,
20+
}
21+
22+
#[derive(Debug, Serialize, Deserialize)]
23+
pub struct StateObject<S, T> {
24+
metadata: StateMetadata,
25+
spec: S,
26+
state: T,
27+
}
28+
29+
impl<S, T> StateObject<S, T> {
30+
#[allow(dead_code)]
31+
pub fn new(metadata: StateMetadata, spec: S, state: T) -> Self {
32+
Self {
33+
metadata,
34+
spec,
35+
state,
36+
}
37+
}
38+
}
39+
40+
#[derive(Clone, Debug, Serialize, Deserialize)]
41+
pub struct LogSpec;
42+
43+
#[allow(dead_code)]
44+
#[derive(Debug, Serialize, Deserialize)]
45+
pub struct LogState {
46+
/// A monotonically increasing sequence number.
47+
seq: usize,
48+
/// The message in the log.
49+
message: String,
50+
}
51+
52+
impl LogState {
53+
#[allow(dead_code)]
54+
pub fn new(seq: usize, message: String) -> Self {
55+
Self { seq, message }
56+
}
57+
}
58+
59+
/// A generic state object which is the partially serialized version of a
60+
/// [`StateObject`]. Since [`StateObject`] takes generic types, those type information
61+
/// can be retrieved from the metadata to deserialize [`GenericStateObject`] into
62+
/// a [`StateObject<S, T>`].
63+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Named)]
64+
pub struct GenericStateObject {
65+
metadata: StateMetadata,
66+
data: String,
67+
}
68+
69+
impl<S, T> TryFrom<StateObject<S, T>> for GenericStateObject
70+
where
71+
S: Spec,
72+
T: State,
73+
{
74+
type Error = serde_json::Error;
75+
76+
fn try_from(obj: StateObject<S, T>) -> Result<Self, Self::Error> {
77+
Ok(Self {
78+
metadata: obj.metadata.clone(),
79+
data: serde_json::to_string(&obj)?,
80+
})
81+
}
82+
}
83+
84+
/// Spec is the define the desired state of an object, defined by the user.
85+
pub trait Spec: Serialize + for<'de> Deserialize<'de> {}
86+
/// State is the current state of an object.
87+
pub trait State: Serialize + for<'de> Deserialize<'de> {}
88+
89+
impl Spec for LogSpec {}
90+
impl State for LogState {}

0 commit comments

Comments
 (0)