diff --git a/Cargo.lock b/Cargo.lock index 4592a8e..5c8a80d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -549,6 +549,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom 0.2.16", + "instant", + "rand 0.8.5", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -2536,6 +2547,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -3685,10 +3705,11 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "3.0.0" +version = "3.0.1" dependencies = [ "anyhow", "async-trait", + "backoff", "bincode 2.0.1", "bytemuck", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 5db1c00..6fc1626 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "3.0.0" +version = "3.0.1" edition = "2024" [[bin]] @@ -9,6 +9,7 @@ path = "src/bin/agent.rs" [dependencies] anyhow = "1.0.81" +backoff = "0.4.0" ed25519-dalek = "2.1.1" serde = { version = "1.0.197", features = ["derive", "rc"] } async-trait = "0.1.79" diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index e3c479f..5dcc4b7 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -3,6 +3,11 @@ use { anyhow::{ Result, anyhow, + bail, + }, + backoff::{ + ExponentialBackoffBuilder, + backoff::Backoff, }, futures_util::{ SinkExt, @@ -20,10 +25,15 @@ use { std::{ path::PathBuf, sync::Arc, - time::Duration, + time::{ + Duration, + Instant, + }, }, tokio::{ net::TcpStream, + select, + sync::broadcast, task::JoinHandle, }, tokio_tungstenite::{ @@ -42,6 +52,8 @@ use { url::Url, }; +pub const RELAYER_CHANNEL_CAPACITY: usize = 1000; + #[derive(Clone, Debug, Deserialize)] pub struct Config { pub history_url: Url, @@ -56,21 +68,21 @@ fn default_publish_interval() -> Duration { Duration::from_millis(200) } -struct RelayerSender { - ws_senders: Vec>, TungsteniteMessage>>, +struct RelayerWsSession { + ws_sender: SplitSink>, TungsteniteMessage>, } -impl RelayerSender { - async fn send_price_update( +impl RelayerWsSession { + async fn send_transaction( &mut self, signed_lazer_transaction: &SignedLazerTransaction, ) -> Result<()> { - tracing::debug!("price_update: {:?}", signed_lazer_transaction); + tracing::debug!("signed_lazer_transaction: {:?}", signed_lazer_transaction); let buf = signed_lazer_transaction.write_to_bytes()?; - for sender in self.ws_senders.iter_mut() { - sender.send(TungsteniteMessage::from(buf.clone())).await?; - sender.flush().await?; - } + self.ws_sender + .send(TungsteniteMessage::from(buf.clone())) + .await?; + self.ws_sender.flush().await?; Ok(()) } } @@ -88,31 +100,110 @@ async fn connect_to_relayer( let headers = req.headers_mut(); headers.insert( "Authorization", - HeaderValue::from_str(&format!("Bearer {}", token))?, + HeaderValue::from_str(&format!("Bearer {token}"))?, ); let (ws_stream, _) = connect_async_with_config(req, None, true).await?; Ok(ws_stream.split()) } -async fn connect_to_relayers( - config: &Config, -) -> Result<( - RelayerSender, - Vec>>>, -)> { - let mut relayer_senders = Vec::new(); - let mut relayer_receivers = Vec::new(); - for url in config.relayer_urls.clone() { - let (relayer_sender, relayer_receiver) = - connect_to_relayer(url, &config.authorization_token).await?; - relayer_senders.push(relayer_sender); - relayer_receivers.push(relayer_receiver); +struct RelayerSessionTask { + // connection state + url: Url, + token: String, + receiver: broadcast::Receiver, +} + +impl RelayerSessionTask { + pub async fn run(&mut self) { + let initial_interval = Duration::from_millis(100); + let max_interval = Duration::from_secs(5); + let mut backoff = ExponentialBackoffBuilder::new() + .with_initial_interval(initial_interval) + .with_max_interval(max_interval) + .with_max_elapsed_time(None) + .build(); + + const FAILURE_RESET_TIME: Duration = Duration::from_secs(300); + let mut first_failure_time = Instant::now(); + let mut failure_count = 0; + + loop { + match self.run_relayer_connection().await { + Ok(()) => { + tracing::info!("relayer session graceful shutdown"); + return; + } + Err(e) => { + if first_failure_time.elapsed() > FAILURE_RESET_TIME { + failure_count = 0; + first_failure_time = Instant::now(); + backoff.reset(); + } + + failure_count += 1; + let next_backoff = backoff.next_backoff().unwrap_or(max_interval); + tracing::error!( + "relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}", + e, + failure_count, + next_backoff + ); + tokio::time::sleep(next_backoff).await; + } + } + } + } + + pub async fn run_relayer_connection(&mut self) -> Result<()> { + // Establish relayer connection + // Relayer will drop the connection if no data received in 5s + let (relayer_ws_sender, mut relayer_ws_receiver) = + connect_to_relayer(self.url.clone(), &self.token).await?; + let mut relayer_ws_session = RelayerWsSession { + ws_sender: relayer_ws_sender, + }; + + loop { + select! { + recv_result = self.receiver.recv() => { + match recv_result { + Ok(transaction) => { + if let Err(e) = relayer_ws_session.send_transaction(&transaction).await { + tracing::error!("Error publishing transaction to Lazer relayer: {e:?}"); + bail!("Failed to publish transaction to Lazer relayer: {e:?}"); + } + }, + Err(e) => { + match e { + broadcast::error::RecvError::Closed => { + tracing::error!("transaction broadcast channel closed"); + bail!("transaction broadcast channel closed"); + } + broadcast::error::RecvError::Lagged(skipped_count) => { + tracing::warn!("transaction broadcast channel lagged by {skipped_count} messages"); + } + } + } + } + } + // Handle messages from the relayers, such as errors if we send a bad update + msg = relayer_ws_receiver.next() => { + match msg { + Some(Ok(msg)) => { + tracing::debug!("Received message from relayer: {msg:?}"); + } + Some(Err(e)) => { + tracing::error!("Error receiving message from at relayer: {e:?}"); + } + None => { + tracing::error!("relayer connection closed"); + bail!("relayer connection closed"); + } + } + } + } + } } - let sender = RelayerSender { - ws_senders: relayer_senders, - }; - tracing::info!("connected to relayers: {:?}", config.relayer_urls); - Ok((sender, relayer_receivers)) } // TODO: This is copied from history-service; move to Lazer protocol sdk. @@ -156,10 +247,26 @@ async fn fetch_symbols(history_url: &Url) -> Result> { #[instrument(skip(config, state))] pub fn lazer_exporter(config: Config, state: Arc) -> Vec> { - let handles = vec![tokio::spawn(lazer_exporter::lazer_exporter( + let mut handles = vec![]; + + // can safely drop first receiver for ease of iteration + let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); + + for url in config.relayer_urls.iter() { + let mut task = RelayerSessionTask { + url: url.clone(), + token: config.authorization_token.to_owned(), + receiver: relayer_sender.subscribe(), + }; + handles.push(tokio::spawn(async move { task.run().await })); + } + + handles.push(tokio::spawn(lazer_exporter::lazer_exporter( config.clone(), state, - ))]; + relayer_sender, + ))); + handles } @@ -170,20 +277,19 @@ mod lazer_exporter { services::lazer_exporter::{ Config, SymbolResponse, - connect_to_relayers, fetch_symbols, }, state::local::LocalStore, }, anyhow::{ Context, + Result, bail, }, ed25519_dalek::{ Signer, SigningKey, }, - futures_util::StreamExt, protobuf::{ Message, MessageField, @@ -209,44 +315,44 @@ mod lazer_exporter { std::{ collections::HashMap, sync::Arc, - time::Duration, }, - tokio_stream::StreamMap, + tokio::sync::broadcast::Sender, }; - pub async fn lazer_exporter(config: Config, state: Arc) - where - S: LocalStore, - S: Send + Sync + 'static, - { - let mut failure_count = 0; - let retry_duration = Duration::from_secs(1); - - loop { - match run(&config, state.clone()).await { - Ok(()) => { - tracing::info!("lazer_exporter graceful shutdown"); - return; - } - Err(e) => { - failure_count += 1; - tracing::error!( - "lazer_exporter failed with error: {:?}, failure_count: {}; retrying in {:?}", - e, - failure_count, - retry_duration - ); - tokio::time::sleep(retry_duration).await; - } + fn get_signing_key(config: &Config) -> Result { + // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher + let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { + Ok(k) => k, + Err(e) => { + tracing::error!( + error = ?e, + publish_keypair_path = config.publish_keypair_path.display().to_string(), + "Reading publish keypair returned an error. ", + ); + bail!("Reading publish keypair returned an error. "); } - } + }; + + SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) + .context("Failed to create signing key from keypair") } - async fn run(config: &Config, state: Arc) -> anyhow::Result<()> - where + pub async fn lazer_exporter( + config: Config, + state: Arc, + relayer_sender: Sender, + ) where S: LocalStore, S: Send + Sync + 'static, { + let signing_key = match get_signing_key(&config) { + Ok(signing_key) => signing_key, + Err(e) => { + tracing::error!("lazer_exporter signing key failure: {e:?}"); + return; + } + }; + // TODO: Re-fetch on an interval? let lazer_symbols: HashMap = match fetch_symbols(&config.history_url).await { @@ -265,33 +371,10 @@ mod lazer_exporter { .collect(), Err(e) => { tracing::error!("Failed to fetch Lazer symbols: {e:?}"); - bail!("Failed to fetch Lazer symbols: {e:?}"); + return; } }; - // Establish relayer connections - // Relayer will drop the connection if no data received in 5s - let (mut relayer_sender, relayer_receivers) = connect_to_relayers(config).await?; - let mut stream_map = StreamMap::new(); - for (i, receiver) in relayer_receivers.into_iter().enumerate() { - stream_map.insert(config.relayer_urls[i].clone(), receiver); - } - - // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher - let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { - Ok(k) => k, - Err(e) => { - tracing::error!( - error = ?e, - publish_keypair_path = config.publish_keypair_path.display().to_string(), - "Reading publish keypair returned an error. ", - ); - bail!("Reading publish keypair returned an error. "); - } - }; - - let signing_key = SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) - .context("Failed to create signing key from keypair")?; let mut publish_interval = tokio::time::interval(config.publish_interval_duration); loop { @@ -356,23 +439,10 @@ mod lazer_exporter { payload: Some(buf), special_fields: Default::default(), }; - if let Err(e) = relayer_sender.send_price_update(&signed_lazer_transaction).await { - tracing::error!("Error publishing update to Lazer relayer: {e:?}"); - bail!("Failed to publish update to Lazer relayer: {e:?}"); - } - } - // Handle messages from the relayers, such as errors if we send a bad update - mapped_msg = stream_map.next() => { - match mapped_msg { - Some((relayer_url, Ok(msg))) => { - tracing::debug!("Received message from relayer at {relayer_url}: {msg:?}"); - } - Some((relayer_url, Err(e))) => { - tracing::error!("Error receiving message from at relayer {relayer_url}: {e:?}"); - } - None => { - tracing::error!("relayer connection closed"); - bail!("relayer connection closed"); + match relayer_sender.send(signed_lazer_transaction.clone()) { + Ok(_) => (), + Err(e) => { + tracing::error!("Error sending transaction to relayer receivers: {e}"); } } }