Skip to content

Commit c8c99c4

Browse files
[hyperactor_mesh]: examples: sieve of eratosthenes
1 parent a2d4277 commit c8c99c4

File tree

1 file changed

+149
-0
lines changed

1 file changed

+149
-0
lines changed

hyperactor_mesh/examples/sieve.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
//! Based on the "Parallel Sieve of Eratosthenes" described by Leroy
9+
//! and Didier in ["UNIX System Programming in
10+
//! OCaml"](https://ocaml.github.io/ocamlunix/ocamlunix.pdf).
11+
//! Implements a [Sieve of
12+
//! Eratosthenes](https://en.wikipedia.org/wiki/Sieve_of_Eratosthenes)
13+
//! using dynamically spawned actors to filter candidates
14+
//! concurrently.
15+
use std::process::ExitCode;
16+
17+
use anyhow::Result;
18+
use async_trait::async_trait;
19+
use hyperactor::Actor;
20+
use hyperactor::ActorRef;
21+
use hyperactor::Handler;
22+
use hyperactor::Instance;
23+
use hyperactor::Named;
24+
use hyperactor::PortRef;
25+
use hyperactor_mesh::Mesh;
26+
use hyperactor_mesh::ProcMesh;
27+
use hyperactor_mesh::alloc::AllocSpec;
28+
use hyperactor_mesh::alloc::Allocator;
29+
use hyperactor_mesh::alloc::LocalAllocator;
30+
use hyperactor_mesh::shape;
31+
use serde::Deserialize;
32+
use serde::Serialize;
33+
34+
/// Candidate number submitted to the sieve.
35+
///
36+
/// Sent into the actor chain to test `number` for primality. If found
37+
/// to be prime, reported via `prime_collector`.
38+
#[derive(Debug, Serialize, Deserialize, Named)]
39+
pub struct NextNumber {
40+
/// Candidate number to test.
41+
pub number: u64,
42+
/// Port for reporting discovered primes.
43+
pub prime_collector: PortRef<u64>,
44+
}
45+
46+
/// Parameters for spawning a `SieveActor`.
47+
///
48+
/// Carries the prime value this actor filters.
49+
#[derive(Debug, Named, Serialize, Deserialize, Clone)]
50+
pub struct SieveParams {
51+
/// Prime number assigned to this actor.
52+
pub prime: u64,
53+
}
54+
55+
/// Actor representing one sieve filter.
56+
///
57+
/// Filters candidates divisible by `prime`. Forwards survivors to
58+
/// `next`. Spawns a new child when a new prime is discovered.
59+
#[derive(Debug)]
60+
#[hyperactor::export_spawn(NextNumber)]
61+
pub struct SieveActor {
62+
/// Prime used for filtering.
63+
prime: u64,
64+
/// Next actor in the sieve chain.
65+
next: Option<ActorRef<SieveActor>>,
66+
}
67+
68+
#[async_trait]
69+
impl Handler<NextNumber> for SieveActor {
70+
async fn handle(&mut self, this: &Instance<Self>, msg: NextNumber) -> Result<()> {
71+
// Filter divisible candidates.
72+
if msg.number % self.prime != 0 {
73+
// Forward to next actor if it exists.
74+
if let Some(next) = &self.next {
75+
next.send(
76+
this,
77+
NextNumber {
78+
number: msg.number,
79+
prime_collector: msg.prime_collector.clone(),
80+
},
81+
)?;
82+
} else {
83+
// New prime discovered.
84+
msg.prime_collector.send(this, msg.number)?;
85+
86+
// Spawn new sieve actor for the new prime.
87+
let child = SieveActor::spawn(this, SieveParams { prime: msg.number }).await?;
88+
let child_ref = child.bind();
89+
self.next = Some(child_ref);
90+
}
91+
}
92+
Ok(())
93+
}
94+
}
95+
96+
#[async_trait]
97+
impl Actor for SieveActor {
98+
type Params = SieveParams;
99+
100+
/// Creates a sieve actor for `prime`.
101+
async fn new(params: Self::Params) -> Result<Self> {
102+
Ok(Self {
103+
prime: params.prime,
104+
next: None,
105+
})
106+
}
107+
}
108+
109+
#[tokio::main]
110+
async fn main() -> Result<ExitCode> {
111+
let alloc = LocalAllocator
112+
.allocate(AllocSpec {
113+
shape: shape! { replica = 1 },
114+
constraints: Default::default(),
115+
})
116+
.await?;
117+
118+
let mesh = ProcMesh::allocate(alloc).await?;
119+
120+
let sieve_params = SieveParams { prime: 2 };
121+
let sieve_mesh = mesh.spawn::<SieveActor>("sieve", &sieve_params).await?;
122+
let sieve_head = sieve_mesh.get(0).unwrap();
123+
124+
let mut primes = vec![2];
125+
let mut candidate = 3;
126+
127+
let (prime_collector_tx, mut prime_collector_rx) = mesh.client().open_port();
128+
let prime_collector_ref = prime_collector_tx.bind();
129+
130+
while primes.len() < 100 {
131+
sieve_head.send(
132+
mesh.client(),
133+
NextNumber {
134+
number: candidate,
135+
prime_collector: prime_collector_ref.clone(),
136+
},
137+
)?;
138+
while let Ok(Some(prime)) = prime_collector_rx.try_recv() {
139+
primes.push(prime);
140+
}
141+
candidate += 1;
142+
}
143+
144+
while let Ok(Some(_)) = prime_collector_rx.try_recv() {}
145+
146+
primes.sort();
147+
println!("Primes : {:?}", primes);
148+
Ok(ExitCode::SUCCESS)
149+
}

0 commit comments

Comments
 (0)