From 6cc14960d0db5b528774421caca63af90e66611d Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Wed, 8 May 2019 10:30:09 -0300 Subject: [PATCH 1/3] main: Monitor contention in tokio threadpool --- node/src/main.rs | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/node/src/main.rs b/node/src/main.rs index 9f790e0d124..db4155f2663 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -18,13 +18,12 @@ extern crate lazy_static; extern crate url; use clap::{App, Arg}; -use futures::sync::oneshot; +use futures::sync::{mpsc, oneshot}; use git_testament::{git_testament, render_testament}; use ipfs_api::IpfsClient; use lazy_static::lazy_static; use std::env; use std::str::FromStr; -use std::sync::Arc; use std::time::Duration; use graph::components::forward; @@ -64,7 +63,6 @@ lazy_static! { git_testament!(TESTAMENT); fn main() { - println!("REORG_THRESHOLD = {:#?}", *REORG_THRESHOLD); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); // Register guarded panic logger which ensures logs flush on shutdown let (panic_logger, _panic_guard) = guarded_logger(); @@ -574,6 +572,29 @@ fn async_main() -> impl Future + Send + 'static { .expect("Failed to start GraphQL subscription server"), ); + // Periodically check for contention in the tokio threadpool. First spawn a + // task that simply responds to "ping" requests. Then spawn a separate + // thread to periodically ping it and check responsiveness. + let (ping_send, ping_receive) = mpsc::channel::>(1); + tokio::spawn( + ping_receive + .for_each(move |pong_send| pong_send.clone().send(()).map(|_| ()).map_err(|_| ())), + ); + let contention_logger = logger.clone(); + std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_secs(10)); + let (pong_send, pong_receive) = std::sync::mpsc::channel(); + ping_send.clone().send(pong_send).wait().unwrap(); + let mut timeout = Duration::from_millis(1); + while pong_receive.recv_timeout(timeout).is_err() { + warn!(contention_logger, "Possible contention in tokio threadpool"; + "timeout_ms" => timeout.as_millis()); + if timeout < Duration::from_secs(10) { + timeout *= 10; + } + } + }); + future::empty() } From d9b05a7dc228c61b82cc2fbc73b9915a30cde5e8 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Wed, 8 May 2019 10:30:17 -0300 Subject: [PATCH 2/3] core/tests: Remove stray `dbg!`s --- core/tests/interfaces.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/tests/interfaces.rs b/core/tests/interfaces.rs index 32cb8bf7f00..1c692d09074 100644 --- a/core/tests/interfaces.rs +++ b/core/tests/interfaces.rs @@ -293,7 +293,6 @@ fn interface_non_inline_fragment() { // Query only the fragment. let query = "query { leggeds { ...frag } } fragment frag on Animal { name }"; let res = insert_and_query(subgraph_id, schema, vec![entity], query).unwrap(); - dbg!(&res.errors); assert_eq!( format!("{:?}", res.data.unwrap()), r#"Object({"leggeds": List([Object({"name": String("cow")})])})"# @@ -336,7 +335,6 @@ fn interface_inline_fragment() { let query = "query { leggeds(orderBy: legs) { ... on Animal { name } ...on Bird { airspeed } } }"; let res = insert_and_query(subgraph_id, schema, vec![animal, bird], query).unwrap(); - dbg!(&res.errors); assert_eq!( format!("{:?}", res.data.unwrap()), r#"Object({"leggeds": List([Object({"airspeed": Int(Number(24))}), Object({"name": String("cow")})])})"# From 63a55ef65f5afc6f9e136e29b138a73feadc5097 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Wed, 8 May 2019 16:01:16 -0300 Subject: [PATCH 3/3] graph, node: Add a LogCode to the tokio contention log --- graph/src/util/log.rs | 2 ++ node/src/main.rs | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/graph/src/util/log.rs b/graph/src/util/log.rs index 54f368c5380..b83a440b4cd 100644 --- a/graph/src/util/log.rs +++ b/graph/src/util/log.rs @@ -112,6 +112,7 @@ pub enum LogCode { BlockIngestionStatus, GraphQlQuerySuccess, GraphQlQueryFailure, + TokioContention, } impl Display for LogCode { @@ -123,6 +124,7 @@ impl Display for LogCode { LogCode::BlockIngestionStatus => "BlockIngestionStatus", LogCode::GraphQlQuerySuccess => "GraphQLQuerySuccess", LogCode::GraphQlQueryFailure => "GraphQLQueryFailure", + LogCode::TokioContention => "TokioContention", }; write!(f, "{}", value) } diff --git a/node/src/main.rs b/node/src/main.rs index db4155f2663..bab1481c060 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -582,13 +582,14 @@ fn async_main() -> impl Future + Send + 'static { ); let contention_logger = logger.clone(); std::thread::spawn(move || loop { - std::thread::sleep(Duration::from_secs(10)); + std::thread::sleep(Duration::from_millis(100)); let (pong_send, pong_receive) = std::sync::mpsc::channel(); ping_send.clone().send(pong_send).wait().unwrap(); let mut timeout = Duration::from_millis(1); while pong_receive.recv_timeout(timeout).is_err() { warn!(contention_logger, "Possible contention in tokio threadpool"; - "timeout_ms" => timeout.as_millis()); + "timeout_ms" => timeout.as_millis(), + "code" => LogCode::TokioContention); if timeout < Duration::from_secs(10) { timeout *= 10; }