diff --git a/Cargo.toml b/Cargo.toml index 4931c39..96fe65c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,18 +21,18 @@ rustdoc-args = ["--cfg", "feature=\"docs\""] [features] default = ["h1_client", "native-tls"] -docs = ["h1_client", "curl_client", "wasm_client", "hyper_client", "unstable-config"] +docs = ["h1_client", "curl_client", "wasm_client", "hyper_client"] h1_client = ["async-h1", "async-std", "deadpool", "futures"] native_client = ["curl_client", "wasm_client"] curl_client = ["isahc", "async-std"] -wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures", "futures"] +wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures", "futures", "async-std"] hyper_client = ["hyper", "hyper-tls", "http-types/hyperium_http", "futures-util", "tokio"] native-tls = ["async-native-tls"] rustls = ["async-tls", "rustls_crate"] -unstable-config = [] +unstable-config = [] # deprecated [dependencies] async-trait = "0.1.37" diff --git a/src/config.rs b/src/config.rs index 691bcda..09ce6e8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,22 +4,36 @@ use std::fmt::Debug; use std::time::Duration; /// Configuration for `HttpClient`s. -#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] #[non_exhaustive] #[derive(Clone)] pub struct Config { /// HTTP/1.1 `keep-alive` (connection pooling). /// /// Default: `true`. + /// + /// Note: Does nothing on `wasm_client`. pub http_keep_alive: bool, /// TCP `NO_DELAY`. /// /// Default: `false`. + /// + /// Note: Does nothing on `wasm_client`. pub tcp_no_delay: bool, /// Connection timeout duration. /// /// Default: `Some(Duration::from_secs(60))`. pub timeout: Option, + /// Maximum number of simultaneous connections that this client is allowed to keep open to individual hosts at one time. + /// + /// Default: `50`. + /// This number is based on a few random benchmarks and see whatever gave decent perf vs resource use in Orogene. + /// + /// Note: The behavior of this is different depending on the backend in use. + /// - `h1_client`: `0` is disallowed and asserts as otherwise it would cause a semaphore deadlock. + /// - `curl_client`: `0` allows for limitless connections per host. + /// - `hyper_client`: No effect. Hyper does not support such an option. + /// - `wasm_client`: No effect. Web browsers do not support such an option. + pub max_connections_per_host: usize, /// TLS Configuration (Rustls) #[cfg_attr(feature = "docs", doc(cfg(feature = "h1_client")))] #[cfg(all(feature = "h1_client", feature = "rustls"))] @@ -36,7 +50,8 @@ impl Debug for Config { dbg_struct .field("http_keep_alive", &self.http_keep_alive) .field("tcp_no_delay", &self.tcp_no_delay) - .field("timeout", &self.timeout); + .field("timeout", &self.timeout) + .field("max_connections_per_host", &self.max_connections_per_host); #[cfg(all(feature = "h1_client", feature = "rustls"))] { @@ -62,6 +77,7 @@ impl Config { http_keep_alive: true, tcp_no_delay: false, timeout: Some(Duration::from_secs(60)), + max_connections_per_host: 50, #[cfg(all(feature = "h1_client", any(feature = "rustls", feature = "native-tls")))] tls_config: None, } @@ -93,6 +109,12 @@ impl Config { self } + /// Set the maximum number of simultaneous connections that this client is allowed to keep open to individual hosts at one time. + pub fn max_connections_per_host(mut self, max_connections_per_host: usize) -> Self { + self.max_connections_per_host = max_connections_per_host; + self + } + /// Set TLS Configuration (Rustls) #[cfg_attr(feature = "docs", doc(cfg(feature = "h1_client")))] #[cfg(all(feature = "h1_client", feature = "rustls"))] diff --git a/src/h1/mod.rs b/src/h1/mod.rs index 6577732..e44e765 100644 --- a/src/h1/mod.rs +++ b/src/h1/mod.rs @@ -1,8 +1,6 @@ //! http-client implementation for async-h1, with connection pooling ("Keep-Alive"). -#[cfg(feature = "unstable-config")] use std::convert::{Infallible, TryFrom}; - use std::fmt::Debug; use std::net::SocketAddr; use std::sync::Arc; @@ -33,19 +31,15 @@ use tcp::{TcpConnWrapper, TcpConnection}; #[cfg(any(feature = "native-tls", feature = "rustls"))] use tls::{TlsConnWrapper, TlsConnection}; -// This number is based on a few random benchmarks and see whatever gave decent perf vs resource use. -const DEFAULT_MAX_CONCURRENT_CONNECTIONS: usize = 50; - type HttpPool = DashMap>; #[cfg(any(feature = "native-tls", feature = "rustls"))] type HttpsPool = DashMap, Error>>; -/// Async-h1 based HTTP Client, with connecton pooling ("Keep-Alive"). +/// async-h1 based HTTP Client, with connection pooling ("Keep-Alive"). pub struct H1Client { http_pools: HttpPool, #[cfg(any(feature = "native-tls", feature = "rustls"))] https_pools: HttpsPool, - max_concurrent_connections: usize, config: Arc, } @@ -82,10 +76,6 @@ impl Debug for H1Client { .collect::>(), ) .field("https_pools", &https_pools) - .field( - "max_concurrent_connections", - &self.max_concurrent_connections, - ) .field("config", &self.config) .finish() } @@ -104,19 +94,29 @@ impl H1Client { http_pools: DashMap::new(), #[cfg(any(feature = "native-tls", feature = "rustls"))] https_pools: DashMap::new(), - max_concurrent_connections: DEFAULT_MAX_CONCURRENT_CONNECTIONS, config: Arc::new(Config::default()), } } /// Create a new instance. + #[deprecated( + since = "6.5.0", + note = "This function is misnamed. Prefer `Config::max_connections_per_host` instead." + )] pub fn with_max_connections(max: usize) -> Self { + #[cfg(features = "h1_client")] + assert!(max > 0, "max_connections_per_host with h1_client must be greater than zero or it will deadlock!"); + + let config = Config { + max_connections_per_host: max, + ..Default::default() + }; + Self { http_pools: DashMap::new(), #[cfg(any(feature = "native-tls", feature = "rustls"))] https_pools: DashMap::new(), - max_concurrent_connections: max, - config: Arc::new(Config::default()), + config: Arc::new(config), } } } @@ -157,7 +157,6 @@ impl HttpClient for H1Client { for (idx, addr) in addrs.into_iter().enumerate() { let has_another_addr = idx != max_addrs_idx; - #[cfg(feature = "unstable-config")] if !self.config.http_keep_alive { match scheme { "http" => { @@ -196,7 +195,7 @@ impl HttpClient for H1Client { let manager = TcpConnection::new(addr, self.config.clone()); let pool = Pool::::new( manager, - self.max_concurrent_connections, + self.config.max_connections_per_host, ); self.http_pools.insert(addr, pool); self.http_pools.get(&addr).unwrap() @@ -216,14 +215,11 @@ impl HttpClient for H1Client { req.set_local_addr(stream.local_addr().ok()); let tcp_conn = client::connect(TcpConnWrapper::new(stream), req); - #[cfg(feature = "unstable-config")] return if let Some(timeout) = self.config.timeout { async_std::future::timeout(timeout, tcp_conn).await? } else { tcp_conn.await }; - #[cfg(not(feature = "unstable-config"))] - return tcp_conn.await; } #[cfg(any(feature = "native-tls", feature = "rustls"))] "https" => { @@ -233,7 +229,7 @@ impl HttpClient for H1Client { let manager = TlsConnection::new(host.clone(), addr, self.config.clone()); let pool = Pool::, Error>::new( manager, - self.max_concurrent_connections, + self.config.max_connections_per_host, ); self.https_pools.insert(addr, pool); self.https_pools.get(&addr).unwrap() @@ -253,14 +249,11 @@ impl HttpClient for H1Client { req.set_local_addr(stream.get_ref().local_addr().ok()); let tls_conn = client::connect(TlsConnWrapper::new(stream), req); - #[cfg(feature = "unstable-config")] return if let Some(timeout) = self.config.timeout { async_std::future::timeout(timeout, tls_conn).await? } else { tls_conn.await }; - #[cfg(not(feature = "unstable-config"))] - return tls_conn.await; } _ => unreachable!(), } @@ -272,36 +265,35 @@ impl HttpClient for H1Client { )) } - #[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] - #[cfg(feature = "unstable-config")] /// Override the existing configuration with new configuration. /// /// Config options may not impact existing connections. fn set_config(&mut self, config: Config) -> http_types::Result<()> { + #[cfg(features = "h1_client")] + assert!(config.max_connections_per_host > 0, "max_connections_per_host with h1_client must be greater than zero or it will deadlock!"); + self.config = Arc::new(config); Ok(()) } - #[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] - #[cfg(feature = "unstable-config")] /// Get the current configuration. fn config(&self) -> &Config { &*self.config } } -#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] -#[cfg(feature = "unstable-config")] impl TryFrom for H1Client { type Error = Infallible; fn try_from(config: Config) -> Result { + #[cfg(features = "h1_client")] + assert!(config.max_connections_per_host > 0, "max_connections_per_host with h1_client must be greater than zero or it will deadlock!"); + Ok(Self { http_pools: DashMap::new(), #[cfg(any(feature = "native-tls", feature = "rustls"))] https_pools: DashMap::new(), - max_concurrent_connections: DEFAULT_MAX_CONCURRENT_CONNECTIONS, config: Arc::new(config), }) } diff --git a/src/h1/tcp.rs b/src/h1/tcp.rs index 6b855fd..2887bc0 100644 --- a/src/h1/tcp.rs +++ b/src/h1/tcp.rs @@ -65,7 +65,6 @@ impl Manager for TcpConnection { async fn create(&self) -> Result { let tcp_stream = TcpStream::connect(self.addr).await?; - #[cfg(feature = "unstable-config")] tcp_stream.set_nodelay(self.config.tcp_no_delay)?; Ok(tcp_stream) @@ -75,7 +74,6 @@ impl Manager for TcpConnection { let mut buf = [0; 4]; let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - #[cfg(feature = "unstable-config")] conn.set_nodelay(self.config.tcp_no_delay)?; match Pin::new(conn).poll_read(&mut cx, &mut buf) { diff --git a/src/h1/tls.rs b/src/h1/tls.rs index 796936c..7d3ecf4 100644 --- a/src/h1/tls.rs +++ b/src/h1/tls.rs @@ -74,7 +74,6 @@ impl Manager, Error> for TlsConnection { async fn create(&self) -> Result, Error> { let raw_stream = async_std::net::TcpStream::connect(self.addr).await?; - #[cfg(feature = "unstable-config")] raw_stream.set_nodelay(self.config.tcp_no_delay)?; let tls_stream = add_tls(&self.host, raw_stream, &self.config).await?; @@ -85,7 +84,6 @@ impl Manager, Error> for TlsConnection { let mut buf = [0; 4]; let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - #[cfg(feature = "unstable-config")] conn.get_ref() .set_nodelay(self.config.tcp_no_delay) .map_err(Error::from)?; @@ -108,14 +106,11 @@ cfg_if::cfg_if! { if #[cfg(feature = "rustls")] { #[allow(unused_variables)] pub(crate) async fn add_tls(host: &str, stream: TcpStream, config: &Config) -> Result, std::io::Error> { - #[cfg(all(feature = "h1_client", feature = "unstable-config"))] let connector = if let Some(tls_config) = config.tls_config.as_ref().cloned() { tls_config.into() } else { async_tls::TlsConnector::default() }; - #[cfg(not(feature = "unstable-config"))] - let connector = async_tls::TlsConnector::default(); connector.connect(host, stream).await } @@ -126,10 +121,7 @@ cfg_if::cfg_if! { stream: TcpStream, config: &Config, ) -> Result, async_native_tls::Error> { - #[cfg(feature = "unstable-config")] let connector = config.tls_config.as_ref().cloned().unwrap_or_default(); - #[cfg(not(feature = "unstable-config"))] - let connector = async_native_tls::TlsConnector::new(); connector.connect(host, stream).await } diff --git a/src/hyper.rs b/src/hyper.rs index c1404fc..05c329b 100644 --- a/src/hyper.rs +++ b/src/hyper.rs @@ -1,8 +1,6 @@ //! http-client implementation for reqwest -#[cfg(feature = "unstable-config")] -use std::convert::Infallible; -use std::convert::TryFrom; +use std::convert::{Infallible, TryFrom}; use std::fmt::Debug; use std::io; use std::str::FromStr; @@ -74,7 +72,6 @@ impl HttpClient for HyperClient { let req = HyperHttpRequest::try_from(req).await?.into_inner(); let conn_fut = self.client.dyn_request(req); - #[cfg(feature = "unstable-config")] let response = if let Some(timeout) = self.config.timeout { match tokio::time::timeout(timeout, conn_fut).await { Err(_elapsed) => Err(Error::from_str(400, "Client timed out")), @@ -85,15 +82,10 @@ impl HttpClient for HyperClient { conn_fut.await? }; - #[cfg(not(feature = "unstable-config"))] - let response = conn_fut.await?; - let res = HttpTypesResponse::try_from(response).await?.into_inner(); Ok(res) } - #[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] - #[cfg(feature = "unstable-config")] /// Override the existing configuration with new configuration. /// /// Config options may not impact existing connections. @@ -111,16 +103,12 @@ impl HttpClient for HyperClient { Ok(()) } - #[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] - #[cfg(feature = "unstable-config")] /// Get the current configuration. fn config(&self) -> &Config { &self.config } } -#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] -#[cfg(feature = "unstable-config")] impl TryFrom for HyperClient { type Error = Infallible; diff --git a/src/isahc.rs b/src/isahc.rs index 30afdc1..92b0f1a 100644 --- a/src/isahc.rs +++ b/src/isahc.rs @@ -1,10 +1,8 @@ //! http-client implementation for isahc -#[cfg(feature = "unstable-config")] use std::convert::TryFrom; use async_std::io::BufReader; -#[cfg(feature = "unstable-config")] use isahc::config::Configurable; use isahc::{http, ResponseExt}; @@ -75,13 +73,12 @@ impl HttpClient for IsahcClient { Ok(response) } - #[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] - #[cfg(feature = "unstable-config")] /// Override the existing configuration with new configuration. /// /// Config options may not impact existing connections. fn set_config(&mut self, config: Config) -> http_types::Result<()> { - let mut builder = isahc::HttpClient::builder(); + let mut builder = + isahc::HttpClient::builder().max_connections_per_host(config.max_connections_per_host); if !config.http_keep_alive { builder = builder.connection_cache_size(0); @@ -99,16 +96,12 @@ impl HttpClient for IsahcClient { Ok(()) } - #[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] - #[cfg(feature = "unstable-config")] /// Get the current configuration. fn config(&self) -> &Config { &self.config } } -#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] -#[cfg(feature = "unstable-config")] impl TryFrom for IsahcClient { type Error = isahc::Error; diff --git a/src/lib.rs b/src/lib.rs index 91ccfd5..ba4102f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,12 +14,8 @@ forbid(unsafe_code) )] -#[cfg(feature = "unstable-config")] mod config; -#[cfg(feature = "unstable-config")] pub use config::Config; -#[cfg(not(feature = "unstable-config"))] -type Config = (); #[cfg_attr(feature = "docs", doc(cfg(feature = "curl_client")))] #[cfg(all(feature = "curl_client", not(target_arch = "wasm32")))] @@ -68,7 +64,6 @@ pub trait HttpClient: std::fmt::Debug + Unpin + Send + Sync + 'static { /// Perform a request. async fn send(&self, req: Request) -> Result; - #[cfg(feature = "unstable-config")] /// Override the existing configuration with new configuration. /// /// Config options may not impact existing connections. @@ -79,7 +74,6 @@ pub trait HttpClient: std::fmt::Debug + Unpin + Send + Sync + 'static { ) } - #[cfg(feature = "unstable-config")] /// Get the current configuration. fn config(&self) -> &Config { unimplemented!( @@ -89,7 +83,6 @@ pub trait HttpClient: std::fmt::Debug + Unpin + Send + Sync + 'static { } } -#[cfg(feature = "unstable-config")] fn type_name_of(_val: &T) -> &'static str { std::any::type_name::() } @@ -106,14 +99,10 @@ impl HttpClient for Box { self.as_ref().send(req).await } - #[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] - #[cfg(feature = "unstable-config")] fn set_config(&mut self, config: Config) -> http_types::Result<()> { self.as_mut().set_config(config) } - #[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))] - #[cfg(feature = "unstable-config")] fn config(&self) -> &Config { self.as_ref().config() } diff --git a/src/wasm.rs b/src/wasm.rs index cec9103..6ef78b2 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -1,23 +1,33 @@ //! http-client implementation for fetch -use super::{http_types::Headers, Body, Error, HttpClient, Request, Response}; +use std::convert::{Infallible, TryFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; use futures::prelude::*; -use std::convert::TryFrom; -use std::pin::Pin; -use std::task::{Context, Poll}; +use crate::Config; + +use super::{http_types::Headers, Body, Error, HttpClient, Request, Response}; /// WebAssembly HTTP Client. #[derive(Debug)] pub struct WasmClient { - _priv: (), + config: Config, } impl WasmClient { /// Create a new instance. pub fn new() -> Self { - Self { _priv: () } + Self { + config: Config::default(), + } + } +} + +impl Default for WasmClient { + fn default() -> Self { + Self::new() } } @@ -30,9 +40,16 @@ impl HttpClient for WasmClient { 'a: 'async_trait, Self: 'async_trait, { + let config = self.config.clone(); + InnerFuture::new(async move { let req: fetch::Request = fetch::Request::new(req).await?; - let mut res = req.send().await?; + let conn = req.send(); + let mut res = if let Some(timeout) = config.timeout { + async_std::future::timeout(timeout, conn).await?? + } else { + conn.await? + }; let body = res.body_bytes(); let mut response = @@ -46,6 +63,28 @@ impl HttpClient for WasmClient { Ok(response) }) } + + /// Override the existing configuration with new configuration. + /// + /// Config options may not impact existing connections. + fn set_config(&mut self, config: Config) -> http_types::Result<()> { + self.config = config; + + Ok(()) + } + + /// Get the current configuration. + fn config(&self) -> &Config { + &self.config + } +} + +impl TryFrom for WasmClient { + type Error = Infallible; + + fn try_from(config: Config) -> Result { + Ok(Self { config }) + } } struct InnerFuture {