Skip to content

Commit 954cf98

Browse files
committed
node, runtime: Simplify panic handling
Use `panic_handler` in the runtime builider to simplify our panic handling logic.
1 parent 551009a commit 954cf98

File tree

3 files changed

+43
-116
lines changed

3 files changed

+43
-116
lines changed

graph/src/log/mod.rs

Lines changed: 2 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
1-
use backtrace::Backtrace;
2-
use futures::sync::oneshot;
3-
use slog::{crit, debug, o, Drain, FilterLevel, Logger};
1+
use slog::{o, Drain, FilterLevel, Logger};
42
use slog_async;
53
use slog_envlogger;
64
use slog_term;
7-
use std::sync::Mutex;
8-
use std::time::Duration;
9-
use std::{env, panic, process, thread};
5+
use std::env;
106

117
pub mod codes;
128
pub mod elastic;
139
pub mod split;
1410

15-
pub const MAPPING_THREAD_PREFIX: &str = "mapping-thread";
16-
1711
pub fn logger(show_debug: bool) -> Logger {
1812
let decorator = slog_term::TermDecorator::new().build();
1913
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
@@ -36,76 +30,3 @@ pub fn logger(show_debug: bool) -> Logger {
3630
let drain = slog_async::Async::new(drain).build().fuse();
3731
Logger::root(drain, o!())
3832
}
39-
40-
pub fn guarded_logger() -> (Logger, slog_async::AsyncGuard) {
41-
let decorator = slog_term::TermDecorator::new().build();
42-
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
43-
let (drain, guard) = slog_async::Async::new(drain).build_with_guard();
44-
(Logger::root(drain.fuse(), o!()), guard)
45-
}
46-
47-
pub fn register_panic_hook(panic_logger: Logger, shutdown_sender: oneshot::Sender<()>) {
48-
let shutdown_mutex = Mutex::new(Some(shutdown_sender));
49-
panic::set_hook(Box::new(move |panic_info| {
50-
let panic_payload = panic_info
51-
.payload()
52-
.downcast_ref::<String>()
53-
.cloned()
54-
.or_else(|| {
55-
panic_info
56-
.payload()
57-
.downcast_ref::<&str>()
58-
.map(|s| s.to_string())
59-
});
60-
61-
let panic_location = if let Some(location) = panic_info.location() {
62-
format!("{}:{}", location.file(), location.line().to_string())
63-
} else {
64-
"NA".to_string()
65-
};
66-
67-
match env::var_os("RUST_BACKTRACE") {
68-
Some(ref val) if val != "0" => {
69-
crit!(
70-
panic_logger, "{}", panic_payload.unwrap();
71-
"location" => &panic_location,
72-
"backtrace" => format!("{:?}", Backtrace::new()),
73-
);
74-
}
75-
_ => {
76-
crit!(
77-
panic_logger, "{}", panic_payload.unwrap();
78-
"location" => &panic_location,
79-
);
80-
}
81-
};
82-
83-
// Don't kill the process when a mapping thread panics.
84-
if thread::current()
85-
.name()
86-
.filter(|name| name.starts_with(MAPPING_THREAD_PREFIX))
87-
.is_some()
88-
{
89-
return;
90-
}
91-
92-
// Send a shutdown signal to main which will attempt to cleanly shutdown the runtime.
93-
match shutdown_mutex.lock().unwrap().take() {
94-
Some(sender) => sender
95-
.send(())
96-
.map(|_| ())
97-
.map_err(|_| {
98-
crit!(panic_logger, "Failed to send shutdown signal");
99-
()
100-
})
101-
.unwrap_or(()),
102-
None => debug!(panic_logger, "Shutdown signal already sent"),
103-
}
104-
105-
// If shutting down the runtime takes too long, exit the process anyways.
106-
thread::spawn(|| {
107-
thread::sleep(Duration::from_millis(3000));
108-
process::exit(1);
109-
});
110-
}));
111-
}

node/src/main.rs

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ extern crate lazy_static;
1818
extern crate url;
1919

2020
use clap::{App, Arg};
21-
use futures::sync::{mpsc, oneshot};
21+
use futures::sync::mpsc;
2222
use git_testament::{git_testament, render_testament};
2323
use ipfs_api::IpfsClient;
2424
use lazy_static::lazy_static;
@@ -27,7 +27,7 @@ use std::str::FromStr;
2727
use std::time::Duration;
2828

2929
use graph::components::forward;
30-
use graph::log::{guarded_logger, logger, register_panic_hook};
30+
use graph::log::logger;
3131
use graph::prelude::{JsonRpcServer as JsonRpcServerTrait, *};
3232
use graph::tokio_executor;
3333
use graph::tokio_timer;
@@ -63,38 +63,45 @@ lazy_static! {
6363
git_testament!(TESTAMENT);
6464

6565
fn main() {
66-
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
67-
// Register guarded panic logger which ensures logs flush on shutdown
68-
let (panic_logger, _panic_guard) = guarded_logger();
69-
register_panic_hook(panic_logger, shutdown_sender);
70-
71-
// Create components for tokio context: multi-threaded runtime,
72-
// executor context on the runtime, and Timer handle.
73-
let runtime = tokio::runtime::Runtime::new().expect("Failed to create runtime");
74-
let mut executor = runtime.executor();
66+
use std::sync::Mutex;
67+
use tokio::runtime;
68+
69+
// Create components for tokio context: multi-threaded runtime, executor
70+
// context on the runtime, and Timer handle.
71+
//
72+
// Configure the runtime to shutdown after a panic.
73+
let runtime: Arc<Mutex<Option<runtime::Runtime>>> = Arc::new(Mutex::new(None));
74+
let handler_runtime = runtime.clone();
75+
*runtime.lock().unwrap() = Some(
76+
runtime::Builder::new()
77+
.panic_handler(move |_| {
78+
let runtime = handler_runtime.clone();
79+
std::thread::spawn(move || {
80+
if let Some(runtime) = runtime.lock().unwrap().take() {
81+
// Try to cleanly shutdown the runtime, but
82+
// unconditionally exit after a while.
83+
std::thread::spawn(|| {
84+
std::thread::sleep(Duration::from_millis(3000));
85+
std::process::exit(1);
86+
});
87+
runtime
88+
.shutdown_now()
89+
.wait()
90+
.expect("Failed to shutdown Tokio Runtime");
91+
println!("Runtime cleaned up and shutdown successfully");
92+
}
93+
});
94+
})
95+
.build()
96+
.unwrap(),
97+
);
98+
99+
let mut executor = runtime.lock().unwrap().as_ref().unwrap().executor();
75100
let mut enter = tokio_executor::enter()
76101
.expect("Failed to enter runtime executor, multiple executors at once");
77102
let timer = Timer::default();
78103
let timer_handle = timer.handle();
79104

80-
// Shutdown the runtime after a panic
81-
std::thread::spawn(|| {
82-
let shutdown_logger = logger(false);
83-
shutdown_receiver
84-
.wait()
85-
.map(|_| {
86-
let _ = runtime
87-
.shutdown_now()
88-
.wait()
89-
.expect("Failed to shutdown Tokio Runtime");
90-
info!(
91-
shutdown_logger,
92-
"Runtime cleaned up and shutdown successfully"
93-
);
94-
})
95-
.expect("Runtime shutdown process did not finish");
96-
});
97-
98105
// Setup runtime context with defaults and run the main application
99106
tokio_executor::with_default(&mut executor, &mut enter, |enter| {
100107
tokio_timer::with_default(&timer_handle, enter, |enter| {
@@ -592,7 +599,9 @@ fn async_main() -> impl Future<Item = (), Error = ()> + Send + 'static {
592599
break;
593600
}
594601
let mut timeout = Duration::from_millis(1);
595-
while pong_receive.recv_timeout(timeout).is_err() {
602+
while pong_receive.recv_timeout(timeout)
603+
== Err(crossbeam_channel::RecvTimeoutError::Timeout)
604+
{
596605
warn!(contention_logger, "Possible contention in tokio threadpool";
597606
"timeout_ms" => timeout.as_millis(),
598607
"code" => LogCode::TokioContention);

runtime/wasm/src/host.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use graph::components::ethereum::*;
1212
use graph::components::store::Store;
1313
use graph::data::subgraph::{DataSource, Source};
1414
use graph::ethabi::{LogParam, RawLog};
15-
use graph::log;
1615
use graph::prelude::{
1716
RuntimeHost as RuntimeHostTrait, RuntimeHostBuilder as RuntimeHostBuilderTrait, *,
1817
};
@@ -199,10 +198,8 @@ impl RuntimeHost {
199198
// dropping the `mapping_request_receiver` which ultimately causes the
200199
// subgraph to fail the next time it tries to handle an event.
201200
let conf = thread::Builder::new().name(format!(
202-
"{}-{}-{}",
203-
log::MAPPING_THREAD_PREFIX,
204-
config.subgraph_id,
205-
data_source_name
201+
"mapping-{}-{}",
202+
config.subgraph_id, data_source_name
206203
));
207204
conf.spawn(move || {
208205
debug!(module_logger, "Start WASM runtime");

0 commit comments

Comments
 (0)