@@ -231,7 +231,7 @@ impl<'a> Executor<'a> {
231
231
let mut rng = fastrand:: Rng :: new ( ) ;
232
232
233
233
// Set the local queue while we're running.
234
- LocalQueue :: set ( & runner. local , {
234
+ LocalQueue :: set ( self . state ( ) , & runner. local , {
235
235
let runner = & runner;
236
236
async move {
237
237
// A future that runs tasks forever.
@@ -262,6 +262,11 @@ impl<'a> Executor<'a> {
262
262
263
263
// Try to push into the local queue.
264
264
LocalQueue :: with ( |local_queue| {
265
+ // Make sure that we don't accidentally push to an executor that isn't ours.
266
+ if !std:: ptr:: eq ( local_queue. state , & * state) {
267
+ return ;
268
+ }
269
+
265
270
if let Err ( e) = local_queue. queue . push ( runnable. take ( ) . unwrap ( ) ) {
266
271
runnable = Some ( e. into_inner ( ) ) ;
267
272
return ;
@@ -844,6 +849,11 @@ impl Drop for Runner<'_> {
844
849
845
850
/// The state of the currently running local queue.
846
851
struct LocalQueue {
852
+ /// The pointer to the state of the executor.
853
+ ///
854
+ /// Used to make sure we don't push runnables to the wrong executor.
855
+ state : * const State ,
856
+
847
857
/// The concurrent queue.
848
858
queue : Arc < ConcurrentQueue < Runnable > > ,
849
859
@@ -861,14 +871,19 @@ impl LocalQueue {
861
871
862
872
impl LocalQueue {
863
873
/// Run a function with a set local queue.
864
- async fn set < F > ( queue : & Arc < ConcurrentQueue < Runnable > > , fut : F ) -> F :: Output
874
+ async fn set < F > (
875
+ state : & State ,
876
+ queue : & Arc < ConcurrentQueue < Runnable > > ,
877
+ fut : F ,
878
+ ) -> F :: Output
865
879
where
866
880
F : Future ,
867
881
{
868
882
// Store the local queue and the current waker.
869
883
let mut old = with_waker ( |waker| {
870
884
LOCAL_QUEUE . with ( move |slot| {
871
885
slot. borrow_mut ( ) . replace ( LocalQueue {
886
+ state : state as * const State ,
872
887
queue : queue. clone ( ) ,
873
888
waker : waker. clone ( ) ,
874
889
} )
0 commit comments