Skip to content

Commit aa24e74

Browse files
committed
core: Move the subgraph task to the blocking thread pool
It has a lot of database interactions, this should reduce the amount of thread pool contention in indexing nodes.
1 parent 1730d3a commit aa24e74

File tree

3 files changed

+13
-1
lines changed

3 files changed

+13
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ serde = "1.0"
1414
serde_json = "1.0"
1515
serde_yaml = "0.7"
1616
uuid = { version = "0.7.2", features = ["v4"] }
17+
tokio-threadpool = "0.1.14"
1718

1819
[dev-dependencies]
1920
# We're using the latest ipfs-api for the HTTPS support that was merged in

core/src/subgraph/instance_manager.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,17 @@ impl SubgraphInstanceManager {
255255
// creates dynamic data sources. This allows us to recreate the
256256
// block stream and include events for the new data sources going
257257
// forward; this is easier than updating the existing block stream.
258-
tokio::spawn(loop_fn(ctx, move |ctx| run_subgraph(ctx)));
258+
let mut subgraph_task = loop_fn(ctx, |ctx| run_subgraph(ctx));
259+
260+
// This task has many calls to the store, so mark it as `blocking`.
261+
tokio::spawn(future::poll_fn(move || {
262+
match tokio_threadpool::blocking(|| subgraph_task.poll()) {
263+
Ok(Async::NotReady) | Ok(Async::Ready(Ok(Async::NotReady))) => Ok(Async::NotReady),
264+
Ok(Async::Ready(Ok(Async::Ready(())))) => Ok(Async::Ready(())),
265+
Ok(Async::Ready(Err(()))) => Err(()),
266+
Err(_) => panic!("not inside a tokio thread pool"),
267+
}
268+
}));
259269

260270
Ok(())
261271
}

0 commit comments

Comments
 (0)