Skip to content

Monitor contention in tokio threadpool #917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/tests/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ fn interface_non_inline_fragment() {
// Query only the fragment.
let query = "query { leggeds { ...frag } } fragment frag on Animal { name }";
let res = insert_and_query(subgraph_id, schema, vec![entity], query).unwrap();
dbg!(&res.errors);
assert_eq!(
format!("{:?}", res.data.unwrap()),
r#"Object({"leggeds": List([Object({"name": String("cow")})])})"#
Expand Down Expand Up @@ -336,7 +335,6 @@ fn interface_inline_fragment() {
let query =
"query { leggeds(orderBy: legs) { ... on Animal { name } ...on Bird { airspeed } } }";
let res = insert_and_query(subgraph_id, schema, vec![animal, bird], query).unwrap();
dbg!(&res.errors);
assert_eq!(
format!("{:?}", res.data.unwrap()),
r#"Object({"leggeds": List([Object({"airspeed": Int(Number(24))}), Object({"name": String("cow")})])})"#
Expand Down
2 changes: 2 additions & 0 deletions graph/src/util/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub enum LogCode {
BlockIngestionStatus,
GraphQlQuerySuccess,
GraphQlQueryFailure,
TokioContention,
}

impl Display for LogCode {
Expand All @@ -123,6 +124,7 @@ impl Display for LogCode {
LogCode::BlockIngestionStatus => "BlockIngestionStatus",
LogCode::GraphQlQuerySuccess => "GraphQLQuerySuccess",
LogCode::GraphQlQueryFailure => "GraphQLQueryFailure",
LogCode::TokioContention => "TokioContention",
};
write!(f, "{}", value)
}
Expand Down
28 changes: 25 additions & 3 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ extern crate lazy_static;
extern crate url;

use clap::{App, Arg};
use futures::sync::oneshot;
use futures::sync::{mpsc, oneshot};
use git_testament::{git_testament, render_testament};
use ipfs_api::IpfsClient;
use lazy_static::lazy_static;
use std::env;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use graph::components::forward;
Expand Down Expand Up @@ -64,7 +63,6 @@ lazy_static! {
git_testament!(TESTAMENT);

fn main() {
println!("REORG_THRESHOLD = {:#?}", *REORG_THRESHOLD);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let (shutdown_sender, shutdown_receiver) = oneshot::channel();
// Register guarded panic logger which ensures logs flush on shutdown
let (panic_logger, _panic_guard) = guarded_logger();
Expand Down Expand Up @@ -574,6 +572,30 @@ fn async_main() -> impl Future<Item = (), Error = ()> + Send + 'static {
.expect("Failed to start GraphQL subscription server"),
);

// Periodically check for contention in the tokio threadpool. First spawn a
// task that simply responds to "ping" requests. Then spawn a separate
// thread to periodically ping it and check responsiveness.
let (ping_send, ping_receive) = mpsc::channel::<std::sync::mpsc::Sender<()>>(1);
tokio::spawn(
ping_receive
.for_each(move |pong_send| pong_send.clone().send(()).map(|_| ()).map_err(|_| ())),
);
let contention_logger = logger.clone();
std::thread::spawn(move || loop {
std::thread::sleep(Duration::from_millis(100));
let (pong_send, pong_receive) = std::sync::mpsc::channel();
ping_send.clone().send(pong_send).wait().unwrap();
let mut timeout = Duration::from_millis(1);
while pong_receive.recv_timeout(timeout).is_err() {
warn!(contention_logger, "Possible contention in tokio threadpool";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If add a LogCode here, that would help in making a Grafana panel + alert for this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a code, but we should probably alert on any warn level log.

"timeout_ms" => timeout.as_millis(),
"code" => LogCode::TokioContention);
if timeout < Duration::from_secs(10) {
timeout *= 10;
}
}
});

future::empty()
}

Expand Down