Skip to content

Commit db11073

Browse files
kaiyuan-lifacebook-github-bot
authored andcommitted
create StateActor to store and serve logs for hyperactor (#211)
Summary: Pull Request resolved: #211 This is an initial commit for the state actor. This diff creates a circular buffer to store latest log messages. If the circular buffer is full, messages will be discarded with a FIFO rule. Next: 1. register the StateActor with actors in the proc_mesh so logs can be streamed in. 2. stream the logs out to loggregator so we can see the logs in `hyper log`. Differential Revision: D76343950
1 parent 547656a commit db11073

File tree

4 files changed

+256
-0
lines changed

4 files changed

+256
-0
lines changed

hyperactor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ mod parse;
8484
pub mod proc;
8585
pub mod reference;
8686
pub mod simnet;
87+
mod state;
8788
pub mod supervision;
8889
pub mod sync;
8990
/// Test utilities

hyperactor/src/state/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
mod object;
10+
mod state_actor;

hyperactor/src/state/object.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use serde::Deserialize;
10+
use serde::Serialize;
11+
12+
use crate as hyperactor;
13+
use crate::Named;
14+
15+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
16+
pub struct StateMetadata {
17+
/// Name of the actor.
18+
pub name: String,
19+
/// Kind of the object.
20+
pub kind: String,
21+
}
22+
23+
#[derive(Debug, Serialize, Deserialize)]
24+
pub struct StateObject<S, T> {
25+
metadata: StateMetadata,
26+
spec: S,
27+
status: T,
28+
}
29+
30+
impl<S, T> StateObject<S, T> {
31+
#[allow(dead_code)]
32+
pub fn new(metadata: StateMetadata, spec: S, status: T) -> Self {
33+
Self {
34+
metadata,
35+
spec,
36+
status,
37+
}
38+
}
39+
}
40+
41+
#[derive(Clone, Debug, Serialize, Deserialize)]
42+
pub struct LogSpec;
43+
44+
#[allow(dead_code)]
45+
#[derive(Debug, Serialize, Deserialize)]
46+
pub struct LogStatus {
47+
/// A monotonically increasing sequence number.
48+
seq: usize,
49+
/// The message in the log.
50+
message: String,
51+
}
52+
53+
impl LogStatus {
54+
pub fn new(seq: usize, message: String) -> Self {
55+
Self { seq, message }
56+
}
57+
}
58+
59+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Named)]
60+
pub struct GenericStateObject {
61+
metadata: StateMetadata,
62+
data: String,
63+
}
64+
65+
impl<S, T> From<StateObject<S, T>> for GenericStateObject
66+
where
67+
S: Spec,
68+
T: Status,
69+
{
70+
fn from(value: StateObject<S, T>) -> Self {
71+
Self {
72+
metadata: value.metadata.clone(),
73+
data: serde_json::to_string(&value).unwrap(),
74+
}
75+
}
76+
}
77+
78+
pub trait Spec: Serialize + for<'de> Deserialize<'de> {}
79+
80+
pub trait Status: Serialize + for<'de> Deserialize<'de> {}
81+
82+
impl Spec for LogSpec {}
83+
impl Status for LogStatus {}

hyperactor/src/state/state_actor.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::collections::VecDeque;
10+
11+
use async_trait::async_trait;
12+
use serde::Deserialize;
13+
use serde::Serialize;
14+
15+
use crate as hyperactor;
16+
use crate::Actor;
17+
use crate::HandleClient;
18+
use crate::Handler;
19+
use crate::Instance;
20+
use crate::Named;
21+
use crate::OncePortRef;
22+
use crate::RefClient;
23+
use crate::state::object::GenericStateObject;
24+
25+
/// Maximum number of logs to store in the buffer. Once the buffer is full, the oldest logs will be dropped.
26+
const LOG_CAPACITY: usize = 1 << 10;
27+
28+
/// A state actor which serves as a centralized store for state.
29+
#[derive(Debug)]
30+
#[hyperactor::export(StateMessage)]
31+
pub struct StateActor {
32+
/// Logs buffer.
33+
logs: VecDeque<GenericStateObject>,
34+
}
35+
36+
/// Endpoints for the state actor.
37+
#[derive(Handler, HandleClient, RefClient, Debug, Serialize, Deserialize, Named)]
38+
pub enum StateMessage {
39+
/// Push a batch of logs to the logs buffer
40+
SetLogs {
41+
/// Object to set.
42+
logs: Vec<GenericStateObject>,
43+
},
44+
/// Fetch logs from the buffer. This will drain the fetched logs from buffer.
45+
GetLogs {
46+
#[reply]
47+
ret: OncePortRef<Vec<GenericStateObject>>,
48+
},
49+
}
50+
51+
#[async_trait]
52+
impl Actor for StateActor {
53+
type Params = ();
54+
55+
async fn new(_params: ()) -> Result<Self, anyhow::Error> {
56+
Ok(Self {
57+
logs: VecDeque::new(),
58+
})
59+
}
60+
}
61+
62+
#[async_trait]
63+
#[hyperactor::forward(StateMessage)]
64+
impl StateMessageHandler for StateActor {
65+
async fn set_logs(
66+
&mut self,
67+
_this: &Instance<Self>,
68+
logs: Vec<GenericStateObject>,
69+
) -> Result<(), anyhow::Error> {
70+
self.logs.extend(logs);
71+
while self.logs.len() > LOG_CAPACITY {
72+
self.logs.pop_front();
73+
}
74+
Ok(())
75+
}
76+
77+
async fn get_logs(
78+
&mut self,
79+
_this: &Instance<Self>,
80+
) -> Result<Vec<GenericStateObject>, anyhow::Error> {
81+
let logs = self.logs.drain(..).collect::<Vec<_>>();
82+
Ok(logs)
83+
}
84+
}
85+
86+
#[cfg(test)]
87+
mod tests {
88+
89+
use super::*;
90+
use crate::ActorHandle;
91+
use crate::proc::Proc;
92+
use crate::state::object::LogSpec;
93+
use crate::state::object::LogStatus;
94+
use crate::state::object::StateMetadata;
95+
use crate::state::object::StateObject;
96+
97+
fn log_items(seq_low: usize, seq_high: usize) -> Vec<GenericStateObject> {
98+
let mut log_items = vec![];
99+
let metadata = StateMetadata {
100+
name: "test".to_string(),
101+
kind: "log".to_string(),
102+
};
103+
let spec = LogSpec {};
104+
for seq in seq_low..seq_high {
105+
let status = LogStatus::new(seq, format!("status {}", seq));
106+
let state_object =
107+
StateObject::<LogSpec, LogStatus>::new(metadata.clone(), spec.clone(), status);
108+
let generic_state_object = GenericStateObject::from(state_object);
109+
log_items.push(generic_state_object);
110+
}
111+
log_items
112+
}
113+
114+
#[tokio::test]
115+
async fn test_state_actor_set_get_logs() {
116+
let proc = Proc::local();
117+
let state_actor: ActorHandle<StateActor> = proc.spawn("state", ()).await.unwrap();
118+
let client = proc.attach("client").unwrap();
119+
let log_items_0_10 = log_items(0, 10);
120+
// Set once and get once.
121+
state_actor
122+
.set_logs(&client, log_items_0_10.clone())
123+
.await
124+
.unwrap();
125+
let fetched_log_items = state_actor.get_logs(&client).await.unwrap();
126+
assert_eq!(fetched_log_items, log_items_0_10);
127+
128+
// Another get should return nothing.
129+
let fetched_log_items = state_actor.get_logs(&client).await.unwrap();
130+
assert_eq!(fetched_log_items, vec![]);
131+
132+
// Set twice and get should get all logs.
133+
let log_items_10_20 = log_items(10, 20);
134+
let log_items_20_30 = log_items(20, 30);
135+
let log_items_10_30 = log_items(10, 30);
136+
state_actor
137+
.set_logs(&client, log_items_10_20.clone())
138+
.await
139+
.unwrap();
140+
state_actor
141+
.set_logs(&client, log_items_20_30.clone())
142+
.await
143+
.unwrap();
144+
let fetched_log_items = state_actor.get_logs(&client).await.unwrap();
145+
assert_eq!(fetched_log_items, log_items_10_30);
146+
147+
// over capacity logs should be dropped.
148+
let log_items_0_max_capacity = log_items(0, LOG_CAPACITY);
149+
let log_items_max_capacity_plus_10 = log_items(LOG_CAPACITY, LOG_CAPACITY + 1);
150+
let log_items_10_max_capacity_plus_10 = log_items(1, LOG_CAPACITY + 1);
151+
state_actor
152+
.set_logs(&client, log_items_0_max_capacity.clone())
153+
.await
154+
.unwrap();
155+
state_actor
156+
.set_logs(&client, log_items_max_capacity_plus_10.clone())
157+
.await
158+
.unwrap();
159+
let fetched_log_items = state_actor.get_logs(&client).await.unwrap();
160+
assert_eq!(fetched_log_items, log_items_10_max_capacity_plus_10);
161+
}
162+
}

0 commit comments

Comments
 (0)