diff --git a/core/tests/interfaces.rs b/core/tests/interfaces.rs index 32cb8bf7f00..1c692d09074 100644 --- a/core/tests/interfaces.rs +++ b/core/tests/interfaces.rs @@ -293,7 +293,6 @@ fn interface_non_inline_fragment() { // Query only the fragment. let query = "query { leggeds { ...frag } } fragment frag on Animal { name }"; let res = insert_and_query(subgraph_id, schema, vec![entity], query).unwrap(); - dbg!(&res.errors); assert_eq!( format!("{:?}", res.data.unwrap()), r#"Object({"leggeds": List([Object({"name": String("cow")})])})"# @@ -336,7 +335,6 @@ fn interface_inline_fragment() { let query = "query { leggeds(orderBy: legs) { ... on Animal { name } ...on Bird { airspeed } } }"; let res = insert_and_query(subgraph_id, schema, vec![animal, bird], query).unwrap(); - dbg!(&res.errors); assert_eq!( format!("{:?}", res.data.unwrap()), r#"Object({"leggeds": List([Object({"airspeed": Int(Number(24))}), Object({"name": String("cow")})])})"# diff --git a/graph/src/util/log.rs b/graph/src/util/log.rs index 54f368c5380..b83a440b4cd 100644 --- a/graph/src/util/log.rs +++ b/graph/src/util/log.rs @@ -112,6 +112,7 @@ pub enum LogCode { BlockIngestionStatus, GraphQlQuerySuccess, GraphQlQueryFailure, + TokioContention, } impl Display for LogCode { @@ -123,6 +124,7 @@ impl Display for LogCode { LogCode::BlockIngestionStatus => "BlockIngestionStatus", LogCode::GraphQlQuerySuccess => "GraphQLQuerySuccess", LogCode::GraphQlQueryFailure => "GraphQLQueryFailure", + LogCode::TokioContention => "TokioContention", }; write!(f, "{}", value) } diff --git a/node/src/main.rs b/node/src/main.rs index 9f790e0d124..bab1481c060 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -18,13 +18,12 @@ extern crate lazy_static; extern crate url; use clap::{App, Arg}; -use futures::sync::oneshot; +use futures::sync::{mpsc, oneshot}; use git_testament::{git_testament, render_testament}; use ipfs_api::IpfsClient; use lazy_static::lazy_static; use std::env; use std::str::FromStr; -use std::sync::Arc; use std::time::Duration; use graph::components::forward; @@ -64,7 +63,6 @@ lazy_static! { git_testament!(TESTAMENT); fn main() { - println!("REORG_THRESHOLD = {:#?}", *REORG_THRESHOLD); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); // Register guarded panic logger which ensures logs flush on shutdown let (panic_logger, _panic_guard) = guarded_logger(); @@ -574,6 +572,30 @@ fn async_main() -> impl Future + Send + 'static { .expect("Failed to start GraphQL subscription server"), ); + // Periodically check for contention in the tokio threadpool. First spawn a + // task that simply responds to "ping" requests. Then spawn a separate + // thread to periodically ping it and check responsiveness. + let (ping_send, ping_receive) = mpsc::channel::>(1); + tokio::spawn( + ping_receive + .for_each(move |pong_send| pong_send.clone().send(()).map(|_| ()).map_err(|_| ())), + ); + let contention_logger = logger.clone(); + std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_millis(100)); + let (pong_send, pong_receive) = std::sync::mpsc::channel(); + ping_send.clone().send(pong_send).wait().unwrap(); + let mut timeout = Duration::from_millis(1); + while pong_receive.recv_timeout(timeout).is_err() { + warn!(contention_logger, "Possible contention in tokio threadpool"; + "timeout_ms" => timeout.as_millis(), + "code" => LogCode::TokioContention); + if timeout < Duration::from_secs(10) { + timeout *= 10; + } + } + }); + future::empty() }