Skip to content

Commit a27d0bc

Browse files
committed
Implement !Send signal future
1 parent 74afb5b commit a27d0bc

File tree

2 files changed

+190
-116
lines changed

2 files changed

+190
-116
lines changed

godot-core/src/tools/async_support.rs

Lines changed: 189 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1,167 +1,241 @@
11
use std::future::Future;
2-
use std::mem::ManuallyDrop;
3-
use std::ops::DerefMut;
4-
use std::pin::Pin;
5-
use std::sync::atomic::{AtomicBool, Ordering};
6-
use std::sync::{Arc, Mutex};
7-
use std::task::{Context, Poll, Wake, Waker};
8-
use std::thread::ThreadId;
9-
10-
use crate::builtin::{Callable, GString, Signal, Variant};
11-
use crate::classes::object::ConnectFlags;
12-
use crate::global::godot_error;
2+
use std::sync::Arc;
3+
use std::task::Wake;
4+
5+
use crate::builtin::{Signal, Variant};
136
use crate::meta::FromGodot;
14-
use crate::obj::EngineEnum;
157
use impl_trait_for_tuples::impl_for_tuples;
168

9+
use future::{GodotWaker, SignalFuture};
10+
use wrapper::{Local, Send, Wrapper};
11+
1712
pub fn godot_task_local(future: impl Future<Output = ()> + 'static) {
18-
godot_task_sync(LocalFuture::new(future))
13+
godot_task_send(Local::new(future))
1914
}
2015

21-
pub fn godot_task_sync(future: impl Future<Output = ()> + 'static + Send + Sync) {
22-
let waker = Arc::new(GodotWaker::new_sync(future));
16+
pub fn godot_task_send(future: impl Future<Output = ()> + 'static + std::marker::Send) {
17+
let waker = Arc::new(GodotWaker::new_send(future));
2318
waker.wake();
2419
}
2520

26-
struct GodotWaker {
27-
future: Mutex<Pin<Box<dyn Future<Output = ()> + 'static + Send + Sync>>>,
28-
again: AtomicBool,
21+
pub trait FromSignalArgs: 'static + Sized {
22+
fn from_args(args: &[&Variant]) -> Self;
2923
}
3024

31-
impl GodotWaker {
32-
fn new_sync(future: impl Future<Output = ()> + 'static + Send + Sync) -> Self {
33-
Self {
34-
future: Mutex::new(Box::pin(future)),
35-
again: AtomicBool::new(false),
36-
}
25+
#[impl_for_tuples(2)]
26+
#[tuple_types_custom_trait_bound(FromGodot + 'static)]
27+
impl FromSignalArgs for Tuple {
28+
fn from_args(args: &[&Variant]) -> Self {
29+
let mut iter = args.iter();
30+
#[allow(clippy::unused_unit)]
31+
(for_tuples!(#(iter.next().unwrap().to()),*))
3732
}
3833
}
3934

40-
impl Wake for GodotWaker {
41-
fn wake(self: Arc<Self>) {
42-
let waker: Waker = self.clone().into();
43-
let mut ctx = Context::from_waker(&waker);
35+
pub type LocalSignalFuture<R> = SignalFuture<R, Local<R>>;
36+
pub type SendSignalFuture<R> = SignalFuture<R, Send<R>>;
4437

45-
// Flag must be set before locking to avoid race condition.
46-
self.again.store(true, Ordering::SeqCst);
47-
if let Ok(mut future) = self.future.try_lock() {
48-
while self.again.swap(false, Ordering::SeqCst) {
49-
let _ = future.as_mut().poll(&mut ctx);
50-
}
51-
}
52-
}
53-
}
38+
// Signal should implement IntoFuture for convenience. Keeping ToSignalFuture around might still be desirable, though. It allows to reuse i
39+
// the same signal instance multiple times.
40+
pub trait ToSignalFuture {
41+
fn to_local_future<R: FromSignalArgs>(&self) -> LocalSignalFuture<R>;
5442

55-
struct LocalFuture<F: Future + 'static> {
56-
future: ManuallyDrop<F>,
57-
thread: ThreadId,
43+
fn to_send_future<R: FromSignalArgs + std::marker::Send>(&self) -> SendSignalFuture<R>;
5844
}
5945

60-
impl<F: Future + 'static> LocalFuture<F> {
61-
fn new(future: F) -> Self {
62-
Self {
63-
future: ManuallyDrop::new(future),
64-
thread: std::thread::current().id(),
65-
}
46+
impl ToSignalFuture for Signal {
47+
fn to_local_future<R: FromSignalArgs>(&self) -> LocalSignalFuture<R> {
48+
SignalFuture::new(format!("Signal::{}", self), self.clone())
49+
}
50+
51+
fn to_send_future<R: FromSignalArgs + std::marker::Send>(&self) -> SendSignalFuture<R> {
52+
SignalFuture::new(format!("Signal::{}", self), self.clone())
6653
}
6754
}
6855

69-
impl<F: Future + 'static> Future for LocalFuture<F> {
70-
type Output = F::Output;
56+
mod future {
57+
use std::future::Future;
58+
use std::marker::PhantomData;
59+
use std::pin::Pin;
60+
use std::sync::{Arc, Mutex};
61+
use std::task::{Context, Poll, Wake, Waker};
62+
63+
use crate::builtin::{Callable, GString, Signal, Variant};
64+
use crate::classes::object::ConnectFlags;
65+
use crate::obj::EngineEnum;
7166

72-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
73-
assert_eq!(self.thread, std::thread::current().id());
74-
unsafe { self.map_unchecked_mut(|s| s.future.deref_mut()) }.poll(cx)
67+
use super::wrapper::{Send, Wrapper};
68+
use super::FromSignalArgs;
69+
70+
pub struct SignalFuture<R, W>
71+
where
72+
R: FromSignalArgs,
73+
W: Wrapper<R>,
74+
{
75+
state: Arc<Mutex<(Option<W>, Option<Waker>)>>,
76+
_phantom: PhantomData<(*mut (), R)>,
7577
}
76-
}
7778

78-
impl<F: Future + 'static> Drop for LocalFuture<F> {
79-
fn drop(&mut self) {
80-
if self.thread == std::thread::current().id() {
81-
unsafe { ManuallyDrop::drop(&mut self.future) };
82-
} else if std::thread::panicking() {
83-
godot_error!(
84-
"LocalFuture is dropped on another thread while panicking. Leaking inner Future to avoid abort."
79+
impl<R, W> SignalFuture<R, W>
80+
where
81+
R: FromSignalArgs,
82+
W: Wrapper<R>,
83+
{
84+
pub(super) fn new(name: impl Into<GString>, signal: Signal) -> Self {
85+
let state = Arc::new(Mutex::new((None, Option::<Waker>::None)));
86+
let callback_state = state.clone();
87+
88+
signal.connect(
89+
Callable::from_fn(name, move |args: &[&Variant]| {
90+
let mut lock = callback_state.lock().unwrap();
91+
let waker = lock.1.take();
92+
93+
lock.0.replace(W::new(R::from_args(args)));
94+
drop(lock);
95+
96+
if let Some(waker) = waker {
97+
waker.wake();
98+
}
99+
100+
Ok(Variant::nil())
101+
}),
102+
ConnectFlags::ONE_SHOT.ord() as i64,
85103
);
86-
} else {
87-
panic!("LocalFuture is dropped on another thread.");
104+
105+
Self {
106+
state,
107+
_phantom: PhantomData,
108+
}
88109
}
89110
}
90-
}
91111

92-
// Verified at runtime by checking the current thread id.
93-
unsafe impl<F: Future + 'static> Send for LocalFuture<F> {}
94-
unsafe impl<F: Future + 'static> Sync for LocalFuture<F> {}
112+
impl<R, W> Future for SignalFuture<R, W>
113+
where
114+
R: FromSignalArgs,
115+
W: Wrapper<R>,
116+
{
117+
type Output = R;
95118

96-
pub struct SignalFuture<R: FromSignalArgs> {
97-
state: Arc<Mutex<(Option<R>, Option<Waker>)>>,
98-
}
119+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
120+
let mut lock = self.state.lock().unwrap();
99121

100-
impl<R: FromSignalArgs> SignalFuture<R> {
101-
fn new(name: impl Into<GString>, signal: Signal) -> Self {
102-
let state = Arc::new(Mutex::new((None, Option::<Waker>::None)));
103-
let callback_state = state.clone();
122+
if let Some(result) = lock.0.take() {
123+
return Poll::Ready(result.into_inner());
124+
}
104125

105-
// the callable currently requires that the return value is Sync + Send
106-
signal.connect(
107-
Callable::from_fn(name, move |args: &[&Variant]| {
108-
let mut lock = callback_state.lock().unwrap();
109-
let waker = lock.1.take();
126+
lock.1.replace(cx.waker().clone());
110127

111-
lock.0.replace(R::from_args(args));
112-
drop(lock);
128+
Poll::Pending
129+
}
130+
}
113131

114-
if let Some(waker) = waker {
115-
waker.wake();
116-
}
132+
unsafe impl<R: FromSignalArgs + std::marker::Send> std::marker::Send for SignalFuture<R, Send<R>> {}
133+
unsafe impl<R: FromSignalArgs + std::marker::Send> std::marker::Sync for SignalFuture<R, Send<R>> {}
117134

118-
Ok(Variant::nil())
119-
}),
120-
ConnectFlags::ONE_SHOT.ord() as i64,
121-
);
135+
pub struct GodotWaker {
136+
future: Mutex<Pin<Box<dyn Future<Output = ()> + 'static + std::marker::Send>>>,
137+
}
122138

123-
Self { state }
139+
impl GodotWaker {
140+
pub(super) fn new_send(
141+
future: impl Future<Output = ()> + 'static + std::marker::Send,
142+
) -> Self {
143+
Self {
144+
future: Mutex::new(Box::pin(future)),
145+
}
146+
}
147+
}
148+
149+
impl Wake for GodotWaker {
150+
fn wake(self: Arc<Self>) {
151+
let waker: Waker = self.clone().into();
152+
let mut ctx = Context::from_waker(&waker);
153+
154+
let mut future = self.future.lock().unwrap();
155+
let _ = future.as_mut().poll(&mut ctx);
156+
}
124157
}
125158
}
126159

127-
impl<R: FromSignalArgs> Future for SignalFuture<R> {
128-
type Output = R;
160+
mod wrapper {
161+
use std::future::Future;
162+
use std::mem::ManuallyDrop;
163+
use std::ops::DerefMut;
164+
use std::pin::Pin;
165+
use std::task::{Context, Poll};
166+
use std::thread::ThreadId;
167+
168+
use crate::builtin::Variant;
169+
use crate::global::godot_error;
129170

130-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131-
let mut lock = self.state.lock().unwrap();
171+
use super::FromSignalArgs;
132172

133-
if let Some(result) = lock.0.take() {
134-
return Poll::Ready(result);
173+
pub trait Wrapper<T>: Sized + std::marker::Send + 'static {
174+
fn new(inner: T) -> Self;
175+
fn into_inner(self) -> T;
176+
}
177+
pub struct Local<T> {
178+
inner: ManuallyDrop<T>,
179+
thread: ThreadId,
180+
}
181+
182+
impl<T: 'static> Wrapper<T> for Local<T> {
183+
fn new(inner: T) -> Self {
184+
Self {
185+
inner: ManuallyDrop::new(inner),
186+
thread: std::thread::current().id(),
187+
}
135188
}
136189

137-
lock.1.replace(cx.waker().clone());
190+
fn into_inner(self) -> T {
191+
let mut this = ManuallyDrop::new(self);
192+
unsafe { ManuallyDrop::take(&mut this.inner) }
193+
}
194+
}
138195

139-
Poll::Pending
196+
impl<T: FromSignalArgs + 'static> FromSignalArgs for Local<T> {
197+
fn from_args(args: &[&Variant]) -> Self {
198+
Local::new(T::from_args(args))
199+
}
140200
}
141-
}
142201

143-
pub trait FromSignalArgs: Sync + Send + 'static {
144-
fn from_args(args: &[&Variant]) -> Self;
145-
}
202+
impl<F: Future + 'static> Future for Local<F> {
203+
type Output = F::Output;
146204

147-
#[impl_for_tuples(12)]
148-
impl FromSignalArgs for Tuple {
149-
for_tuples!( where #(Tuple: FromGodot + Sync + Send + 'static),* );
150-
fn from_args(args: &[&Variant]) -> Self {
151-
let mut iter = args.iter();
152-
#[allow(clippy::unused_unit)]
153-
(for_tuples!(#(iter.next().unwrap().to()),*))
205+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
206+
assert_eq!(self.thread, std::thread::current().id());
207+
unsafe { self.map_unchecked_mut(|s| s.inner.deref_mut()) }.poll(cx)
208+
}
154209
}
155-
}
156210

157-
// Signal should implement IntoFuture for convenience. Keeping ToSignalFuture around might still be desirable, though. It allows to reuse i
158-
// the same signal instance multiple times.
159-
pub trait ToSignalFuture<R: FromSignalArgs> {
160-
fn to_future(&self) -> SignalFuture<R>;
161-
}
211+
impl<T> Drop for Local<T> {
212+
fn drop(&mut self) {
213+
if self.thread == std::thread::current().id() {
214+
unsafe { ManuallyDrop::drop(&mut self.inner) };
215+
} else if std::thread::panicking() {
216+
godot_error!(
217+
"Local is dropped on another thread while panicking. Leaking inner Future to avoid abort."
218+
);
219+
} else {
220+
panic!("Local is dropped on another thread.");
221+
}
222+
}
223+
}
162224

163-
impl<R: FromSignalArgs> ToSignalFuture<R> for Signal {
164-
fn to_future(&self) -> SignalFuture<R> {
165-
SignalFuture::new(format!("Signal::{}", self), self.clone())
225+
// Verified at runtime by checking the current thread id.
226+
unsafe impl<T> std::marker::Send for Local<T> {}
227+
228+
pub struct Send<T> {
229+
inner: T,
230+
}
231+
232+
impl<T: std::marker::Send + 'static> Wrapper<T> for Send<T> {
233+
fn new(inner: T) -> Self {
234+
Self { inner }
235+
}
236+
237+
fn into_inner(self) -> T {
238+
self.inner
239+
}
166240
}
167241
}

itest/rust/src/engine_tests/async_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::framework::itest;
88
async fn call_async_fn(signal: Signal) -> u8 {
99
let value = 5;
1010

11-
let _: () = signal.to_future().await;
11+
let _: () = signal.to_local_future().await;
1212

1313
value + 5
1414
}

0 commit comments

Comments
 (0)