Skip to content

Stabilize config #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ rustdoc-args = ["--cfg", "feature=\"docs\""]

[features]
default = ["h1_client", "native-tls"]
docs = ["h1_client", "curl_client", "wasm_client", "hyper_client", "unstable-config"]
docs = ["h1_client", "curl_client", "wasm_client", "hyper_client"]

h1_client = ["async-h1", "async-std", "deadpool", "futures"]
native_client = ["curl_client", "wasm_client"]
curl_client = ["isahc", "async-std"]
wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures", "futures"]
wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures", "futures", "async-std"]
hyper_client = ["hyper", "hyper-tls", "http-types/hyperium_http", "futures-util", "tokio"]

native-tls = ["async-native-tls"]
rustls = ["async-tls", "rustls_crate"]

unstable-config = []
unstable-config = [] # deprecated

[dependencies]
async-trait = "0.1.37"
Expand Down
26 changes: 24 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,36 @@ use std::fmt::Debug;
use std::time::Duration;

/// Configuration for `HttpClient`s.
#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))]
#[non_exhaustive]
#[derive(Clone)]
pub struct Config {
/// HTTP/1.1 `keep-alive` (connection pooling).
///
/// Default: `true`.
///
/// Note: Does nothing on `wasm_client`.
pub http_keep_alive: bool,
/// TCP `NO_DELAY`.
///
/// Default: `false`.
///
/// Note: Does nothing on `wasm_client`.
pub tcp_no_delay: bool,
/// Connection timeout duration.
///
/// Default: `Some(Duration::from_secs(60))`.
pub timeout: Option<Duration>,
/// Maximum number of simultaneous connections that this client is allowed to keep open to individual hosts at one time.
///
/// Default: `50`.
/// This number is based on a few random benchmarks and see whatever gave decent perf vs resource use in Orogene.
///
/// Note: The behavior of this is different depending on the backend in use.
/// - `h1_client`: `0` is disallowed and asserts as otherwise it would cause a semaphore deadlock.
/// - `curl_client`: `0` allows for limitless connections per host.
/// - `hyper_client`: No effect. Hyper does not support such an option.
/// - `wasm_client`: No effect. Web browsers do not support such an option.
pub max_connections_per_host: usize,
/// TLS Configuration (Rustls)
#[cfg_attr(feature = "docs", doc(cfg(feature = "h1_client")))]
#[cfg(all(feature = "h1_client", feature = "rustls"))]
Expand All @@ -36,7 +50,8 @@ impl Debug for Config {
dbg_struct
.field("http_keep_alive", &self.http_keep_alive)
.field("tcp_no_delay", &self.tcp_no_delay)
.field("timeout", &self.timeout);
.field("timeout", &self.timeout)
.field("max_connections_per_host", &self.max_connections_per_host);

#[cfg(all(feature = "h1_client", feature = "rustls"))]
{
Expand All @@ -62,6 +77,7 @@ impl Config {
http_keep_alive: true,
tcp_no_delay: false,
timeout: Some(Duration::from_secs(60)),
max_connections_per_host: 50,
#[cfg(all(feature = "h1_client", any(feature = "rustls", feature = "native-tls")))]
tls_config: None,
}
Expand Down Expand Up @@ -93,6 +109,12 @@ impl Config {
self
}

/// Set the maximum number of simultaneous connections that this client is allowed to keep open to individual hosts at one time.
pub fn max_connections_per_host(mut self, max_connections_per_host: usize) -> Self {
self.max_connections_per_host = max_connections_per_host;
self
}

/// Set TLS Configuration (Rustls)
#[cfg_attr(feature = "docs", doc(cfg(feature = "h1_client")))]
#[cfg(all(feature = "h1_client", feature = "rustls"))]
Expand Down
52 changes: 22 additions & 30 deletions src/h1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! http-client implementation for async-h1, with connection pooling ("Keep-Alive").

#[cfg(feature = "unstable-config")]
use std::convert::{Infallible, TryFrom};

use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -33,19 +31,15 @@ use tcp::{TcpConnWrapper, TcpConnection};
#[cfg(any(feature = "native-tls", feature = "rustls"))]
use tls::{TlsConnWrapper, TlsConnection};

// This number is based on a few random benchmarks and see whatever gave decent perf vs resource use.
const DEFAULT_MAX_CONCURRENT_CONNECTIONS: usize = 50;

type HttpPool = DashMap<SocketAddr, Pool<TcpStream, std::io::Error>>;
#[cfg(any(feature = "native-tls", feature = "rustls"))]
type HttpsPool = DashMap<SocketAddr, Pool<TlsStream<TcpStream>, Error>>;

/// Async-h1 based HTTP Client, with connecton pooling ("Keep-Alive").
/// async-h1 based HTTP Client, with connection pooling ("Keep-Alive").
pub struct H1Client {
http_pools: HttpPool,
#[cfg(any(feature = "native-tls", feature = "rustls"))]
https_pools: HttpsPool,
max_concurrent_connections: usize,
config: Arc<Config>,
}

Expand Down Expand Up @@ -82,10 +76,6 @@ impl Debug for H1Client {
.collect::<Vec<String>>(),
)
.field("https_pools", &https_pools)
.field(
"max_concurrent_connections",
&self.max_concurrent_connections,
)
.field("config", &self.config)
.finish()
}
Expand All @@ -104,19 +94,29 @@ impl H1Client {
http_pools: DashMap::new(),
#[cfg(any(feature = "native-tls", feature = "rustls"))]
https_pools: DashMap::new(),
max_concurrent_connections: DEFAULT_MAX_CONCURRENT_CONNECTIONS,
config: Arc::new(Config::default()),
}
}

/// Create a new instance.
#[deprecated(
since = "6.5.0",
note = "This function is misnamed. Prefer `Config::max_connections_per_host` instead."
)]
pub fn with_max_connections(max: usize) -> Self {
#[cfg(features = "h1_client")]
assert!(max > 0, "max_connections_per_host with h1_client must be greater than zero or it will deadlock!");

let config = Config {
max_connections_per_host: max,
..Default::default()
};

Self {
http_pools: DashMap::new(),
#[cfg(any(feature = "native-tls", feature = "rustls"))]
https_pools: DashMap::new(),
max_concurrent_connections: max,
config: Arc::new(Config::default()),
config: Arc::new(config),
}
}
}
Expand Down Expand Up @@ -157,7 +157,6 @@ impl HttpClient for H1Client {
for (idx, addr) in addrs.into_iter().enumerate() {
let has_another_addr = idx != max_addrs_idx;

#[cfg(feature = "unstable-config")]
if !self.config.http_keep_alive {
match scheme {
"http" => {
Expand Down Expand Up @@ -196,7 +195,7 @@ impl HttpClient for H1Client {
let manager = TcpConnection::new(addr, self.config.clone());
let pool = Pool::<TcpStream, std::io::Error>::new(
manager,
self.max_concurrent_connections,
self.config.max_connections_per_host,
);
self.http_pools.insert(addr, pool);
self.http_pools.get(&addr).unwrap()
Expand All @@ -216,14 +215,11 @@ impl HttpClient for H1Client {
req.set_local_addr(stream.local_addr().ok());

let tcp_conn = client::connect(TcpConnWrapper::new(stream), req);
#[cfg(feature = "unstable-config")]
return if let Some(timeout) = self.config.timeout {
async_std::future::timeout(timeout, tcp_conn).await?
} else {
tcp_conn.await
};
#[cfg(not(feature = "unstable-config"))]
return tcp_conn.await;
}
#[cfg(any(feature = "native-tls", feature = "rustls"))]
"https" => {
Expand All @@ -233,7 +229,7 @@ impl HttpClient for H1Client {
let manager = TlsConnection::new(host.clone(), addr, self.config.clone());
let pool = Pool::<TlsStream<TcpStream>, Error>::new(
manager,
self.max_concurrent_connections,
self.config.max_connections_per_host,
);
self.https_pools.insert(addr, pool);
self.https_pools.get(&addr).unwrap()
Expand All @@ -253,14 +249,11 @@ impl HttpClient for H1Client {
req.set_local_addr(stream.get_ref().local_addr().ok());

let tls_conn = client::connect(TlsConnWrapper::new(stream), req);
#[cfg(feature = "unstable-config")]
return if let Some(timeout) = self.config.timeout {
async_std::future::timeout(timeout, tls_conn).await?
} else {
tls_conn.await
};
#[cfg(not(feature = "unstable-config"))]
return tls_conn.await;
}
_ => unreachable!(),
}
Expand All @@ -272,36 +265,35 @@ impl HttpClient for H1Client {
))
}

#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))]
#[cfg(feature = "unstable-config")]
/// Override the existing configuration with new configuration.
///
/// Config options may not impact existing connections.
fn set_config(&mut self, config: Config) -> http_types::Result<()> {
#[cfg(features = "h1_client")]
assert!(config.max_connections_per_host > 0, "max_connections_per_host with h1_client must be greater than zero or it will deadlock!");

self.config = Arc::new(config);

Ok(())
}

#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))]
#[cfg(feature = "unstable-config")]
/// Get the current configuration.
fn config(&self) -> &Config {
&*self.config
}
}

#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))]
#[cfg(feature = "unstable-config")]
impl TryFrom<Config> for H1Client {
type Error = Infallible;

fn try_from(config: Config) -> Result<Self, Self::Error> {
#[cfg(features = "h1_client")]
assert!(config.max_connections_per_host > 0, "max_connections_per_host with h1_client must be greater than zero or it will deadlock!");

Ok(Self {
http_pools: DashMap::new(),
#[cfg(any(feature = "native-tls", feature = "rustls"))]
https_pools: DashMap::new(),
max_concurrent_connections: DEFAULT_MAX_CONCURRENT_CONNECTIONS,
config: Arc::new(config),
})
}
Expand Down
2 changes: 0 additions & 2 deletions src/h1/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl Manager<TcpStream, std::io::Error> for TcpConnection {
async fn create(&self) -> Result<TcpStream, std::io::Error> {
let tcp_stream = TcpStream::connect(self.addr).await?;

#[cfg(feature = "unstable-config")]
tcp_stream.set_nodelay(self.config.tcp_no_delay)?;

Ok(tcp_stream)
Expand All @@ -75,7 +74,6 @@ impl Manager<TcpStream, std::io::Error> for TcpConnection {
let mut buf = [0; 4];
let mut cx = Context::from_waker(futures::task::noop_waker_ref());

#[cfg(feature = "unstable-config")]
conn.set_nodelay(self.config.tcp_no_delay)?;

match Pin::new(conn).poll_read(&mut cx, &mut buf) {
Expand Down
8 changes: 0 additions & 8 deletions src/h1/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl Manager<TlsStream<TcpStream>, Error> for TlsConnection {
async fn create(&self) -> Result<TlsStream<TcpStream>, Error> {
let raw_stream = async_std::net::TcpStream::connect(self.addr).await?;

#[cfg(feature = "unstable-config")]
raw_stream.set_nodelay(self.config.tcp_no_delay)?;

let tls_stream = add_tls(&self.host, raw_stream, &self.config).await?;
Expand All @@ -85,7 +84,6 @@ impl Manager<TlsStream<TcpStream>, Error> for TlsConnection {
let mut buf = [0; 4];
let mut cx = Context::from_waker(futures::task::noop_waker_ref());

#[cfg(feature = "unstable-config")]
conn.get_ref()
.set_nodelay(self.config.tcp_no_delay)
.map_err(Error::from)?;
Expand All @@ -108,14 +106,11 @@ cfg_if::cfg_if! {
if #[cfg(feature = "rustls")] {
#[allow(unused_variables)]
pub(crate) async fn add_tls(host: &str, stream: TcpStream, config: &Config) -> Result<TlsStream<TcpStream>, std::io::Error> {
#[cfg(all(feature = "h1_client", feature = "unstable-config"))]
let connector = if let Some(tls_config) = config.tls_config.as_ref().cloned() {
tls_config.into()
} else {
async_tls::TlsConnector::default()
};
#[cfg(not(feature = "unstable-config"))]
let connector = async_tls::TlsConnector::default();

connector.connect(host, stream).await
}
Expand All @@ -126,10 +121,7 @@ cfg_if::cfg_if! {
stream: TcpStream,
config: &Config,
) -> Result<TlsStream<TcpStream>, async_native_tls::Error> {
#[cfg(feature = "unstable-config")]
let connector = config.tls_config.as_ref().cloned().unwrap_or_default();
#[cfg(not(feature = "unstable-config"))]
let connector = async_native_tls::TlsConnector::new();

connector.connect(host, stream).await
}
Expand Down
14 changes: 1 addition & 13 deletions src/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! http-client implementation for reqwest

#[cfg(feature = "unstable-config")]
use std::convert::Infallible;
use std::convert::TryFrom;
use std::convert::{Infallible, TryFrom};
use std::fmt::Debug;
use std::io;
use std::str::FromStr;
Expand Down Expand Up @@ -74,7 +72,6 @@ impl HttpClient for HyperClient {
let req = HyperHttpRequest::try_from(req).await?.into_inner();

let conn_fut = self.client.dyn_request(req);
#[cfg(feature = "unstable-config")]
let response = if let Some(timeout) = self.config.timeout {
match tokio::time::timeout(timeout, conn_fut).await {
Err(_elapsed) => Err(Error::from_str(400, "Client timed out")),
Expand All @@ -85,15 +82,10 @@ impl HttpClient for HyperClient {
conn_fut.await?
};

#[cfg(not(feature = "unstable-config"))]
let response = conn_fut.await?;

let res = HttpTypesResponse::try_from(response).await?.into_inner();
Ok(res)
}

#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))]
#[cfg(feature = "unstable-config")]
/// Override the existing configuration with new configuration.
///
/// Config options may not impact existing connections.
Expand All @@ -111,16 +103,12 @@ impl HttpClient for HyperClient {
Ok(())
}

#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))]
#[cfg(feature = "unstable-config")]
/// Get the current configuration.
fn config(&self) -> &Config {
&self.config
}
}

#[cfg_attr(feature = "docs", doc(cfg(feature = "unstable-config")))]
#[cfg(feature = "unstable-config")]
impl TryFrom<Config> for HyperClient {
type Error = Infallible;

Expand Down
Loading