diff --git a/Cargo.lock b/Cargo.lock index b0e94a782e9..0db8b3c28dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -911,6 +911,7 @@ dependencies = [ "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", "serde_yaml 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)", "test-store 0.1.0", + "tokio-threadpool 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "walkdir 2.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 910eca1447f..ddcc77a1576 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,6 +14,7 @@ serde = "1.0" serde_json = "1.0" serde_yaml = "0.7" uuid = { version = "0.7.2", features = ["v4"] } +tokio-threadpool = "0.1.14" [dev-dependencies] # We're using the latest ipfs-api for the HTTPS support that was merged in diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 3343a8661a3..0480516c9e5 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -255,7 +255,17 @@ impl SubgraphInstanceManager { // creates dynamic data sources. This allows us to recreate the // block stream and include events for the new data sources going // forward; this is easier than updating the existing block stream. - tokio::spawn(loop_fn(ctx, move |ctx| run_subgraph(ctx))); + let mut subgraph_task = loop_fn(ctx, |ctx| run_subgraph(ctx)); + + // This task has many calls to the store, so mark it as `blocking`. + tokio::spawn(future::poll_fn(move || { + match tokio_threadpool::blocking(|| subgraph_task.poll()) { + Ok(Async::NotReady) | Ok(Async::Ready(Ok(Async::NotReady))) => Ok(Async::NotReady), + Ok(Async::Ready(Ok(Async::Ready(())))) => Ok(Async::Ready(())), + Ok(Async::Ready(Err(()))) => Err(()), + Err(_) => panic!("not inside a tokio thread pool"), + } + })); Ok(()) }