Skip to content

Commit 6508f13

Browse files
committed
feat: unstable HttpClient Config
This adds an `unstable-config` feature, with a new `Config` struct, which can be used to configure any `HttpClient` which implements support for it. Currently it supports two features - the most important and most generally supported: - `timeout` (`Duration`) - `no_delay` (`bool`) Implementations are provided for async-h1, isahc, and hyper (partial, no `no_delay` support due to the tls connector). No serious attempt has been made to add this to the wasm client at this point, since I don't understand well how to even build the wasm client or if it even works anymore with the state of rust wasm web build tools.
1 parent 6ba80b3 commit 6508f13

File tree

8 files changed

+322
-27
lines changed

8 files changed

+322
-27
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ h1_client = ["async-h1", "async-std", "deadpool", "futures"]
2727
native_client = ["curl_client", "wasm_client"]
2828
curl_client = ["isahc", "async-std"]
2929
wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures", "futures"]
30-
hyper_client = ["hyper", "hyper-tls", "http-types/hyperium_http", "futures-util"]
30+
hyper_client = ["hyper", "hyper-tls", "http-types/hyperium_http", "futures-util", "tokio"]
3131

3232
native-tls = ["async-native-tls"]
3333
rustls = ["async-tls"]
3434

35+
unstable-config = []
36+
3537
[dependencies]
3638
async-trait = "0.1.37"
3739
dashmap = "4.0.2"
@@ -53,6 +55,7 @@ async-tls = { version = "0.10.0", optional = true }
5355
hyper = { version = "0.13.6", features = ["tcp"], optional = true }
5456
hyper-tls = { version = "0.4.3", optional = true }
5557
futures-util = { version = "0.3.5", features = ["io"], optional = true }
58+
tokio = { version = "0.2", features = ["time"], optional = true }
5659

5760
# curl_client
5861
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]

src/config.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
//! Configuration for `HttpClient`s.
2+
3+
use std::time::Duration;
4+
5+
/// Configuration for `HttpClient`s.
6+
#[non_exhaustive]
7+
#[derive(Clone, Debug)]
8+
pub struct Config {
9+
/// TCP `NO_DELAY`.
10+
///
11+
/// Default: `false`.
12+
pub no_delay: bool,
13+
/// Connection timeout duration.
14+
///
15+
/// Default: `Some(Duration::from_secs(60))`.
16+
pub timeout: Option<Duration>,
17+
}
18+
19+
impl Config {
20+
/// Construct new empty config.
21+
pub fn new() -> Self {
22+
Self {
23+
no_delay: false,
24+
timeout: Some(Duration::from_secs(60)),
25+
}
26+
}
27+
}
28+
29+
impl Default for Config {
30+
fn default() -> Self {
31+
Self::new()
32+
}
33+
}
34+
35+
impl Config {
36+
/// Set TCP `NO_DELAY`.
37+
pub fn set_no_delay(mut self, no_delay: bool) -> Self {
38+
self.no_delay = no_delay;
39+
self
40+
}
41+
42+
/// Set connection timeout duration.
43+
pub fn set_timeout(mut self, timeout: Option<Duration>) -> Self {
44+
self.timeout = timeout;
45+
self
46+
}
47+
}

src/h1/mod.rs

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
//! http-client implementation for async-h1, with connecton pooling ("Keep-Alive").
22
3+
#[cfg(feature = "unstable-config")]
4+
use std::convert::{Infallible, TryFrom};
5+
36
use std::fmt::Debug;
47
use std::net::SocketAddr;
58

@@ -17,6 +20,8 @@ cfg_if::cfg_if! {
1720
}
1821
}
1922

23+
use crate::Config;
24+
2025
use super::{async_trait, Error, HttpClient, Request, Response};
2126

2227
mod tcp;
@@ -40,6 +45,7 @@ pub struct H1Client {
4045
#[cfg(any(feature = "native-tls", feature = "rustls"))]
4146
https_pools: HttpsPool,
4247
max_concurrent_connections: usize,
48+
config: Config,
4349
}
4450

4551
impl Debug for H1Client {
@@ -75,6 +81,7 @@ impl Debug for H1Client {
7581
.collect::<Vec<String>>(),
7682
)
7783
.field("https_pools", &https_pools)
84+
.field("config", &self.config)
7885
.field(
7986
"max_concurrent_connections",
8087
&self.max_concurrent_connections,
@@ -97,6 +104,7 @@ impl H1Client {
97104
#[cfg(any(feature = "native-tls", feature = "rustls"))]
98105
https_pools: DashMap::new(),
99106
max_concurrent_connections: DEFAULT_MAX_CONCURRENT_CONNECTIONS,
107+
config: Config::default(),
100108
}
101109
}
102110

@@ -107,6 +115,7 @@ impl H1Client {
107115
#[cfg(any(feature = "native-tls", feature = "rustls"))]
108116
https_pools: DashMap::new(),
109117
max_concurrent_connections: max,
118+
config: Config::default(),
110119
}
111120
}
112121
}
@@ -152,7 +161,7 @@ impl HttpClient for H1Client {
152161
let pool_ref = if let Some(pool_ref) = self.http_pools.get(&addr) {
153162
pool_ref
154163
} else {
155-
let manager = TcpConnection::new(addr);
164+
let manager = TcpConnection::new(addr, self.config.clone());
156165
let pool = Pool::<TcpStream, std::io::Error>::new(
157166
manager,
158167
self.max_concurrent_connections,
@@ -168,19 +177,28 @@ impl HttpClient for H1Client {
168177
let stream = match pool.get().await {
169178
Ok(s) => s,
170179
Err(_) if has_another_addr => continue,
171-
Err(e) => return Err(Error::from_str(400, e.to_string()))?,
180+
Err(e) => return Err(Error::from_str(400, e.to_string())),
172181
};
173182

174183
req.set_peer_addr(stream.peer_addr().ok());
175184
req.set_local_addr(stream.local_addr().ok());
176-
return client::connect(TcpConnWrapper::new(stream), req).await;
185+
186+
let tcp_conn = client::connect(TcpConnWrapper::new(stream), req);
187+
#[cfg(feature = "unstable-config")]
188+
return if let Some(timeout) = self.config.timeout {
189+
async_std::future::timeout(timeout, tcp_conn).await?
190+
} else {
191+
tcp_conn.await
192+
};
193+
#[cfg(not(feature = "unstable-config"))]
194+
return tcp_conn.await;
177195
}
178196
#[cfg(any(feature = "native-tls", feature = "rustls"))]
179197
"https" => {
180198
let pool_ref = if let Some(pool_ref) = self.https_pools.get(&addr) {
181199
pool_ref
182200
} else {
183-
let manager = TlsConnection::new(host.clone(), addr);
201+
let manager = TlsConnection::new(host.clone(), addr, self.config.clone());
184202
let pool = Pool::<TlsStream<TcpStream>, Error>::new(
185203
manager,
186204
self.max_concurrent_connections,
@@ -196,13 +214,21 @@ impl HttpClient for H1Client {
196214
let stream = match pool.get().await {
197215
Ok(s) => s,
198216
Err(_) if has_another_addr => continue,
199-
Err(e) => return Err(Error::from_str(400, e.to_string()))?,
217+
Err(e) => return Err(Error::from_str(400, e.to_string())),
200218
};
201219

202220
req.set_peer_addr(stream.get_ref().peer_addr().ok());
203221
req.set_local_addr(stream.get_ref().local_addr().ok());
204222

205-
return client::connect(TlsConnWrapper::new(stream), req).await;
223+
let tls_conn = client::connect(TlsConnWrapper::new(stream), req);
224+
#[cfg(feature = "unstable-config")]
225+
return if let Some(timeout) = self.config.timeout {
226+
async_std::future::timeout(timeout, tls_conn).await?
227+
} else {
228+
tls_conn.await
229+
};
230+
#[cfg(not(feature = "unstable-config"))]
231+
return tls_conn.await;
206232
}
207233
_ => unreachable!(),
208234
}
@@ -213,6 +239,37 @@ impl HttpClient for H1Client {
213239
"missing valid address",
214240
))
215241
}
242+
243+
#[cfg(feature = "unstable-config")]
244+
/// Override the existing configuration with new configuration.
245+
///
246+
/// Config options may not impact existing connections.
247+
fn set_config(&mut self, config: Config) -> http_types::Result<()> {
248+
self.config = config;
249+
250+
Ok(())
251+
}
252+
253+
#[cfg(feature = "unstable-config")]
254+
/// Get the current configuration.
255+
fn config(&self) -> &Config {
256+
&self.config
257+
}
258+
}
259+
260+
#[cfg(feature = "unstable-config")]
261+
impl TryFrom<Config> for H1Client {
262+
type Error = Infallible;
263+
264+
fn try_from(config: Config) -> Result<Self, Self::Error> {
265+
Ok(Self {
266+
http_pools: DashMap::new(),
267+
#[cfg(any(feature = "native-tls", feature = "rustls"))]
268+
https_pools: DashMap::new(),
269+
max_concurrent_connections: DEFAULT_MAX_CONCURRENT_CONNECTIONS,
270+
config,
271+
})
272+
}
216273
}
217274

218275
#[cfg(test)]

src/h1/tcp.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@ use deadpool::managed::{Manager, Object, RecycleResult};
88
use futures::io::{AsyncRead, AsyncWrite};
99
use futures::task::{Context, Poll};
1010

11+
use crate::Config;
12+
1113
#[derive(Clone, Debug)]
1214
pub(crate) struct TcpConnection {
1315
addr: SocketAddr,
16+
config: Config,
1417
}
18+
1519
impl TcpConnection {
16-
pub(crate) fn new(addr: SocketAddr) -> Self {
17-
Self { addr }
20+
pub(crate) fn new(addr: SocketAddr, config: Config) -> Self {
21+
Self { addr, config }
1822
}
1923
}
2024

@@ -58,12 +62,21 @@ impl AsyncWrite for TcpConnWrapper {
5862
#[async_trait]
5963
impl Manager<TcpStream, std::io::Error> for TcpConnection {
6064
async fn create(&self) -> Result<TcpStream, std::io::Error> {
61-
TcpStream::connect(self.addr).await
65+
let tcp_stream = TcpStream::connect(self.addr).await?;
66+
67+
#[cfg(feature = "unstable-config")]
68+
tcp_stream.set_nodelay(self.config.no_delay)?;
69+
70+
Ok(tcp_stream)
6271
}
6372

6473
async fn recycle(&self, conn: &mut TcpStream) -> RecycleResult<std::io::Error> {
6574
let mut buf = [0; 4];
6675
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
76+
77+
#[cfg(feature = "unstable-config")]
78+
conn.set_nodelay(self.config.no_delay)?;
79+
6780
match Pin::new(conn).poll_read(&mut cx, &mut buf) {
6881
Poll::Ready(Err(error)) => Err(error),
6982
Poll::Ready(Ok(bytes)) if bytes == 0 => Err(std::io::Error::new(

src/h1/tls.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ cfg_if::cfg_if! {
1616
}
1717
}
1818

19-
use crate::Error;
19+
use crate::{Config, Error};
2020

2121
#[derive(Clone, Debug)]
2222
pub(crate) struct TlsConnection {
2323
host: String,
2424
addr: SocketAddr,
25+
config: Config,
2526
}
27+
2628
impl TlsConnection {
27-
pub(crate) fn new(host: String, addr: SocketAddr) -> Self {
28-
Self { host, addr }
29+
pub(crate) fn new(host: String, addr: SocketAddr, config: Config) -> Self {
30+
Self { host, addr, config }
2931
}
3032
}
3133

@@ -70,13 +72,23 @@ impl AsyncWrite for TlsConnWrapper {
7072
impl Manager<TlsStream<TcpStream>, Error> for TlsConnection {
7173
async fn create(&self) -> Result<TlsStream<TcpStream>, Error> {
7274
let raw_stream = async_std::net::TcpStream::connect(self.addr).await?;
75+
76+
#[cfg(feature = "unstable-config")]
77+
raw_stream.set_nodelay(self.config.no_delay)?;
78+
7379
let tls_stream = add_tls(&self.host, raw_stream).await?;
7480
Ok(tls_stream)
7581
}
7682

7783
async fn recycle(&self, conn: &mut TlsStream<TcpStream>) -> RecycleResult<Error> {
7884
let mut buf = [0; 4];
7985
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
86+
87+
#[cfg(feature = "unstable-config")]
88+
conn.get_ref()
89+
.set_nodelay(self.config.no_delay)
90+
.map_err(Error::from)?;
91+
8092
match Pin::new(conn).poll_read(&mut cx, &mut buf) {
8193
Poll::Ready(Err(error)) => Err(error),
8294
Poll::Ready(Ok(bytes)) if bytes == 0 => Err(std::io::Error::new(
@@ -86,6 +98,7 @@ impl Manager<TlsStream<TcpStream>, Error> for TlsConnection {
8698
_ => Ok(()),
8799
}
88100
.map_err(Error::from)?;
101+
89102
Ok(())
90103
}
91104
}

0 commit comments

Comments
 (0)