Skip to content

Commit 1730d3a

Browse files
committed
ethereum, store: Replace chain head listener channels with tokio::watch
Simplifies and improves performance of the communication between the listener and block streams.
1 parent 85b9ec7 commit 1730d3a

File tree

4 files changed

+20
-113
lines changed

4 files changed

+20
-113
lines changed

datasource/ethereum/src/block_stream.rs

Lines changed: 5 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::sync::Mutex;
77
use std::time::{Duration, Instant};
88

99
use futures::prelude::*;
10-
use futures::sync::mpsc::{channel, Receiver, Sender};
1110
use graph::tokio::timer::Delay;
1211

1312
use graph::data::subgraph::schema::{
@@ -152,9 +151,7 @@ impl<S, C, E> Clone for BlockStreamContext<S, C, E> {
152151
pub struct BlockStream<S, C, E> {
153152
state: Mutex<BlockStreamState>,
154153
consecutive_err_count: u32,
155-
chain_head_update_sink: Sender<ChainHeadUpdate>,
156-
chain_head_update_stream: Receiver<ChainHeadUpdate>,
157-
_chain_head_update_guard: CancelGuard,
154+
chain_head_update_stream: ChainHeadUpdateStream,
158155
ctx: BlockStreamContext<S, C, E>,
159156
}
160157

@@ -168,7 +165,6 @@ where
168165
subgraph_store: Arc<S>,
169166
chain_store: Arc<C>,
170167
eth_adapter: Arc<E>,
171-
chain_head_update_guard: CancelGuard,
172168
node_id: NodeId,
173169
subgraph_id: SubgraphDeploymentId,
174170
log_filter: Option<EthereumLogFilter>,
@@ -178,14 +174,10 @@ where
178174
reorg_threshold: u64,
179175
logger: Logger,
180176
) -> Self {
181-
let (chain_head_update_sink, chain_head_update_stream) = channel(100);
182-
183177
BlockStream {
184178
state: Mutex::new(BlockStreamState::New),
185179
consecutive_err_count: 0,
186-
chain_head_update_sink,
187-
chain_head_update_stream,
188-
_chain_head_update_guard: chain_head_update_guard,
180+
chain_head_update_stream: chain_store.chain_head_updates(),
189181
ctx: BlockStreamContext {
190182
subgraph_store,
191183
chain_store,
@@ -1181,7 +1173,7 @@ where
11811173
BlockStreamState::Idle => {
11821174
match self.chain_head_update_stream.poll() {
11831175
// Chain head was updated
1184-
Ok(Async::Ready(Some(_chain_head_update))) => {
1176+
Ok(Async::Ready(Some(()))) => {
11851177
// Start reconciliation process
11861178
let next_blocks_future = self.ctx.next_blocks();
11871179
state = BlockStreamState::Reconciliation(next_blocks_future);
@@ -1222,21 +1214,6 @@ where
12221214
}
12231215
}
12241216

1225-
impl<S, C, E> EventConsumer<ChainHeadUpdate> for BlockStream<S, C, E>
1226-
where
1227-
S: Store,
1228-
C: ChainStore,
1229-
E: EthereumAdapter,
1230-
{
1231-
fn event_sink(&self) -> Box<Sink<SinkItem = ChainHeadUpdate, SinkError = ()> + Send> {
1232-
let logger = self.ctx.logger.clone();
1233-
1234-
Box::new(self.chain_head_update_sink.clone().sink_map_err(move |_| {
1235-
debug!(logger, "Terminating chain head updates; channel closed");
1236-
}))
1237-
}
1238-
}
1239-
12401217
pub struct BlockStreamBuilder<S, C, E> {
12411218
subgraph_store: Arc<S>,
12421219
chain_store: Arc<C>,
@@ -1300,25 +1277,12 @@ where
13001277
let logger = logger.new(o!(
13011278
"component" => "BlockStream",
13021279
));
1303-
let logger_for_stream = logger.clone();
1304-
1305-
// Create a chain head update stream whose lifetime is tied to the
1306-
// liftetime of the block stream; we do this to immediately terminate
1307-
// the chain head update listener when the block stream is shut down
1308-
let cancel_guard = CancelGuard::new();
1309-
let chain_head_update_stream =
1310-
self.chain_store
1311-
.chain_head_updates()
1312-
.cancelable(&cancel_guard, move || {
1313-
debug!(logger_for_stream, "Terminating chain head updates");
1314-
});
13151280

13161281
// Create the actual subgraph-specific block stream
1317-
let block_stream = BlockStream::new(
1282+
BlockStream::new(
13181283
self.subgraph_store.clone(),
13191284
self.chain_store.clone(),
13201285
self.eth_adapter.clone(),
1321-
cancel_guard,
13221286
self.node_id.clone(),
13231287
deployment_id,
13241288
log_filter,
@@ -1327,18 +1291,7 @@ where
13271291
include_calls_in_blocks,
13281292
self.reorg_threshold,
13291293
logger,
1330-
);
1331-
1332-
// Forward chain head updates from the listener to the block stream;
1333-
// this will be canceled as soon as the block stream goes out of scope
1334-
tokio::spawn(
1335-
chain_head_update_stream
1336-
.forward(block_stream.event_sink())
1337-
.map_err(|_| ())
1338-
.map(|_| ()),
1339-
);
1340-
1341-
block_stream
1294+
)
13421295
}
13431296
}
13441297

graph/src/components/ethereum/listener.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ pub struct ChainHeadUpdate {
2121
pub head_block_number: u64,
2222
}
2323

24-
pub type ChainHeadUpdateStream = Box<Stream<Item = ChainHeadUpdate, Error = ()> + Send>;
24+
/// The updates have no payload, receivers should call `Store::chain_head_ptr`
25+
/// to check what the latest block is.
26+
pub type ChainHeadUpdateStream = Box<Stream<Item = (), Error = ()> + Send>;
2527

2628
pub trait ChainHeadUpdateListener {
2729
// Subscribe to chain head updates.

graph/src/components/ethereum/stream.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ use futures::Stream;
33

44
use crate::prelude::*;
55

6-
pub trait BlockStream:
7-
Stream<Item = EthereumBlockWithTriggers, Error = Error> + EventConsumer<ChainHeadUpdate>
8-
{
6+
pub trait BlockStream: Stream<Item = EthereumBlockWithTriggers, Error = Error> {
97
fn parse_triggers(
108
log_filter_opt: Option<EthereumLogFilter>,
119
call_filter_opt: Option<EthereumCallFilter>,
Lines changed: 11 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,18 @@
1-
use futures::sync::mpsc::{channel, Sender};
2-
use std::collections::HashMap;
3-
use std::sync::{Arc, RwLock};
4-
use uuid::Uuid;
1+
use tokio::sync::watch;
52

63
use graph::prelude::{ChainHeadUpdateListener as ChainHeadUpdateListenerTrait, *};
74
use graph::serde_json;
85

96
use crate::notification_listener::{NotificationListener, SafeChannelName};
107

11-
type ChainHeadUpdateSubscribers = Arc<RwLock<HashMap<String, Sender<ChainHeadUpdate>>>>;
12-
138
pub struct ChainHeadUpdateListener {
14-
logger: Logger,
15-
subscribers: ChainHeadUpdateSubscribers,
9+
update_receiver: watch::Receiver<()>,
1610
_listener: NotificationListener,
1711
}
1812

1913
impl ChainHeadUpdateListener {
2014
pub fn new(logger: &Logger, postgres_url: String, network_name: String) -> Self {
2115
let logger = logger.new(o!("component" => "ChainHeadUpdateListener"));
22-
let subscribers = Arc::new(RwLock::new(HashMap::new()));
2316

2417
// Create a Postgres notification listener for chain head updates
2518
let mut listener = NotificationListener::new(
@@ -28,11 +21,11 @@ impl ChainHeadUpdateListener {
2821
SafeChannelName::i_promise_this_is_safe("chain_head_updates"),
2922
);
3023

31-
Self::listen(&logger, &mut listener, network_name, subscribers.clone());
24+
let (update_sender, update_receiver) = watch::channel(());
25+
Self::listen(logger, &mut listener, network_name, update_sender);
3226

3327
ChainHeadUpdateListener {
34-
logger,
35-
subscribers,
28+
update_receiver,
3629

3730
// We keep the listener around to tie its stream's lifetime to
3831
// that of the chain head update listener and prevent it from
@@ -42,10 +35,10 @@ impl ChainHeadUpdateListener {
4235
}
4336

4437
fn listen(
45-
logger: &Logger,
38+
logger: Logger,
4639
listener: &mut NotificationListener,
4740
network_name: String,
48-
subscribers: ChainHeadUpdateSubscribers,
41+
mut update_sender: watch::Sender<()>,
4942
) {
5043
let logger = logger.clone();
5144

@@ -72,64 +65,25 @@ impl ChainHeadUpdateListener {
7265
}
7366
})
7467
.for_each(move |update| {
75-
let logger = logger.clone();
76-
let senders = subscribers.read().unwrap().clone();
77-
let subscribers = subscribers.clone();
78-
7968
debug!(
80-
logger,
69+
logger.clone(),
8170
"Received chain head update";
8271
"network" => &update.network_name,
8372
"head_block_hash" => format!("{}", update.head_block_hash),
8473
"head_block_number" => &update.head_block_number,
8574
);
8675

87-
// Forward update to all susbcribers
88-
stream::iter_ok::<_, ()>(senders).for_each(move |(id, mut sender)| {
89-
let logger = logger.clone();
90-
let subscribers = subscribers.clone();
91-
92-
// A subgraph that's syncing will let chain head updates
93-
// pile up in the channel. So we don't wait for room in
94-
// the channel and instead skip it, it will have the
95-
// opportunity to grab future updates.
96-
match sender.try_send(update.clone()) {
97-
// Move on to the next subscriber
98-
Ok(()) => (),
99-
Err(ref e) if e.is_full() => {
100-
// Temporary log, feel free to remove if noisy.
101-
debug!(logger, "Full chain head update channel"; "id" => &id);
102-
}
103-
Err(ref e) if e.is_disconnected() => {
104-
// Remove disconnected subscribers.
105-
debug!(logger, "Unsubscribe"; "id" => &id);
106-
subscribers.write().unwrap().remove(&id);
107-
}
108-
Err(e) => warn!(logger, "Unexpected send error"; "e" => e.to_string()),
109-
}
110-
Ok(())
111-
})
76+
update_sender.broadcast(()).map_err(|_| ())
11277
}),
11378
);
11479

115-
// We're ready, start listening to chain head updaates
80+
// We're ready, start listening to chain head updates
11681
listener.start();
11782
}
11883
}
11984

12085
impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
12186
fn subscribe(&self) -> ChainHeadUpdateStream {
122-
// Generate a new (unique) UUID; we're looping just to be sure we avoid collisions
123-
let mut id = Uuid::new_v4().to_string();
124-
while self.subscribers.read().unwrap().contains_key(&id) {
125-
id = Uuid::new_v4().to_string();
126-
}
127-
128-
debug!(self.logger, "Subscribe"; "id" => &id);
129-
130-
// Create a subscriber and return the receiving end
131-
let (sender, receiver) = channel(100);
132-
self.subscribers.write().unwrap().insert(id, sender);
133-
Box::new(receiver)
87+
Box::new(self.update_receiver.clone().map_err(|_| ()))
13488
}
13589
}

0 commit comments

Comments
 (0)