diff --git a/examples/client.rs b/examples/client.rs index 774b3cd47d..dc974e3b01 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -39,7 +39,7 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> { let addr = format!("{}:{}", host, port); let stream = TcpStream::connect(addr).await?; - let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); diff --git a/examples/client_json.rs b/examples/client_json.rs index 31337283ba..5f53a5a978 100644 --- a/examples/client_json.rs +++ b/examples/client_json.rs @@ -29,7 +29,7 @@ async fn fetch_json(url: hyper::Uri) -> Result> { let stream = TcpStream::connect(addr).await?; - let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); diff --git a/examples/gateway.rs b/examples/gateway.rs index 373b36ecf8..22de700eea 100644 --- a/examples/gateway.rs +++ b/examples/gateway.rs @@ -43,7 +43,8 @@ async fn main() -> Result<(), Box> { async move { let client_stream = TcpStream::connect(addr).await.unwrap(); - let (mut sender, conn) = hyper::client::conn::handshake(client_stream).await?; + let (mut sender, conn) = + hyper::client::conn::http1::handshake(client_stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); diff --git a/examples/http_proxy.rs b/examples/http_proxy.rs index b072bbe34c..0173ccb634 100644 --- a/examples/http_proxy.rs +++ b/examples/http_proxy.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; -use hyper::client::conn::Builder; +use hyper::client::conn::http1::Builder; use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::upgrade::Upgraded; diff --git a/examples/upgrades.rs b/examples/upgrades.rs index de78eea76b..f664bcc487 100644 --- a/examples/upgrades.rs +++ b/examples/upgrades.rs @@ -95,7 +95,7 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> { .unwrap(); let stream = TcpStream::connect(addr).await?; - let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { diff --git a/examples/web_api.rs b/examples/web_api.rs index 7db23681c2..13d9dbd512 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -29,7 +29,7 @@ async fn client_request_response() -> Result> { let port = req.uri().port_u16().expect("uri has no port"); let stream = TcpStream::connect(format!("{}:{}", host, port)).await?; - let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index 7cf246c5b3..7303589180 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -4,6 +4,7 @@ use std::error::Error as StdError; use std::fmt; use std::sync::Arc; +use bytes::Bytes; use http::{Request, Response}; use httparse::ParserConfig; use tokio::io::{AsyncRead, AsyncWrite}; @@ -27,6 +28,27 @@ pub struct SendRequest { dispatch: dispatch::Sender, Response>, } +/// Deconstructed parts of a `Connection`. +/// +/// This allows taking apart a `Connection` at a later time, in order to +/// reclaim the IO object, and additional related pieces. +#[derive(Debug)] +pub struct Parts { + /// The original IO object used in the handshake. + pub io: T, + /// A buffer of bytes that have been read but not processed as HTTP. + /// + /// For instance, if the `Connection` is used for an HTTP upgrade request, + /// it is possible the server sent back the first bytes of the new protocol + /// along with the response upgrade. + /// + /// You will want to check for any existing bytes if you plan to continue + /// communicating on the IO object. + pub read_buf: Bytes, + _inner: (), +} + + /// A future that processes all HTTP state for the IO object. /// /// In most cases, this should just be spawned into an executor, so that it @@ -40,6 +62,41 @@ where inner: Option>, } +impl Connection +where + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, + B: HttpBody + 'static, + B::Error: Into>, +{ + /// Return the inner IO object, and additional information. + /// + /// Only works for HTTP/1 connections. HTTP/2 connections will panic. + pub fn into_parts(self) -> Parts { + + let (io, read_buf, _) = self.inner.expect("already upgraded").into_inner(); + Parts { + io, + read_buf, + _inner: (), + } + } + + /// Poll the connection for completion, but without calling `shutdown` + /// on the underlying IO. + /// + /// This is useful to allow running a connection while doing an HTTP + /// upgrade. Once the upgrade is completed, the connection would be "done", + /// but it is not desired to actually shutdown the IO object. Instead you + /// would take it back using `into_parts`. + /// + /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) + /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) + /// to work with this function; or use the `without_shutdown` wrapper. + pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.inner.as_mut().expect("algready upgraded").poll_without_shutdown(cx) + } +} + /// A builder to configure an HTTP connection. /// /// After setting options, the builder is used to create a handshake future. @@ -80,6 +137,13 @@ impl SendRequest { self.dispatch.poll_ready(cx) } + /// Waits until the dispatcher is ready + /// + /// If the associated connection is closed, this returns an Error. + pub async fn ready(&mut self) -> crate::Result<()> { + futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + } + /* pub(super) async fn when_ready(self) -> crate::Result { let mut me = Some(self); @@ -125,7 +189,7 @@ where /// /// ``` /// # use http::header::HOST; - /// # use hyper::client::conn::SendRequest; + /// # use hyper::client::conn::http1::SendRequest; /// # use hyper::Body; /// use hyper::Request; /// diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 3be24ed080..1565b50d28 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -74,6 +74,13 @@ impl SendRequest { } } + /// Waits until the dispatcher is ready + /// + /// If the associated connection is closed, this returns an Error. + pub async fn ready(&mut self) -> crate::Result<()> { + futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + } + /* pub(super) async fn when_ready(self) -> crate::Result { let mut me = Some(self); @@ -119,7 +126,7 @@ where /// /// ``` /// # use http::header::HOST; - /// # use hyper::client::conn::SendRequest; + /// # use hyper::client::conn::http2::SendRequest; /// # use hyper::Body; /// use hyper::Request; /// diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index 7880c65a95..52da8a1fa3 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -22,7 +22,7 @@ //! async fn main() -> Result<(), Box> { //! let target_stream = TcpStream::connect("example.com:80").await?; //! -//! let (mut request_sender, connection) = conn::handshake(target_stream).await?; +//! let (mut request_sender, connection) = conn::http1::handshake(target_stream).await?; //! //! // spawn a task to poll the connection and drive the HTTP state //! tokio::spawn(async move { @@ -54,926 +54,9 @@ //! # } //! ``` -use std::error::Error as StdError; -use std::fmt; -#[cfg(not(all(feature = "http1", feature = "http2")))] -use std::marker::PhantomData; -use std::sync::Arc; -#[cfg(all(feature = "runtime", feature = "http2"))] -use std::time::Duration; - -use bytes::Bytes; -use futures_util::future; -use httparse::ParserConfig; -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use tower_service::Service; -use tracing::{debug, trace}; - -use super::dispatch; -use crate::body::HttpBody; -#[cfg(not(all(feature = "http1", feature = "http2")))] -use crate::common::Never; -use crate::common::{ - exec::{BoxSendFuture, Exec}, - task, Future, Pin, Poll, -}; -use crate::proto; -use crate::rt::Executor; -#[cfg(feature = "http1")] -use crate::upgrade::Upgraded; -use crate::{Body, Request, Response}; - #[cfg(feature = "http1")] pub mod http1; #[cfg(feature = "http2")] pub mod http2; -#[cfg(feature = "http1")] -type Http1Dispatcher = - proto::dispatch::Dispatcher, B, T, proto::h1::ClientTransaction>; - -#[cfg(not(feature = "http1"))] -type Http1Dispatcher = (Never, PhantomData<(T, Pin>)>); - -#[cfg(feature = "http2")] -type Http2ClientTask = proto::h2::ClientTask; - -#[cfg(not(feature = "http2"))] -type Http2ClientTask = (Never, PhantomData>>); - -pin_project! { - #[project = ProtoClientProj] - enum ProtoClient - where - B: HttpBody, - { - H1 { - #[pin] - h1: Http1Dispatcher, - }, - H2 { - #[pin] - h2: Http2ClientTask, - }, - } -} - -/// Returns a handshake future over some IO. -/// -/// This is a shortcut for `Builder::new().handshake(io)`. -/// See [`client::conn`](crate::client::conn) for more. -pub async fn handshake( - io: T, -) -> crate::Result<(SendRequest, Connection)> -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - Builder::new().handshake(io).await -} - -/// The sender side of an established connection. -pub struct SendRequest { - dispatch: dispatch::Sender, Response>, -} - -/// A future that processes all HTTP state for the IO object. -/// -/// In most cases, this should just be spawned into an executor, so that it -/// can process incoming and outgoing messages, notice hangups, and the like. -#[must_use = "futures do nothing unless polled"] -pub struct Connection -where - T: AsyncRead + AsyncWrite + Send + 'static, - B: HttpBody + 'static, -{ - inner: Option>, -} - -/// A builder to configure an HTTP connection. -/// -/// After setting options, the builder is used to create a handshake future. -#[derive(Clone, Debug)] -pub struct Builder { - pub(super) exec: Exec, - h09_responses: bool, - h1_parser_config: ParserConfig, - h1_writev: Option, - h1_title_case_headers: bool, - h1_preserve_header_case: bool, - #[cfg(feature = "ffi")] - h1_preserve_header_order: bool, - h1_read_buf_exact_size: Option, - h1_max_buf_size: Option, - #[cfg(feature = "ffi")] - h1_headers_raw: bool, - #[cfg(feature = "http2")] - h2_builder: proto::h2::client::Config, - version: Proto, -} - -#[derive(Clone, Debug)] -enum Proto { - #[cfg(feature = "http1")] - Http1, - #[cfg(feature = "http2")] - Http2, -} - -/// A future returned by `SendRequest::send_request`. -/// -/// Yields a `Response` if successful. -#[must_use = "futures do nothing unless polled"] -pub struct ResponseFuture { - inner: ResponseFutureState, -} - -enum ResponseFutureState { - Waiting(dispatch::Promise>), - // Option is to be able to `take()` it in `poll` - Error(Option), -} - -/// Deconstructed parts of a `Connection`. -/// -/// This allows taking apart a `Connection` at a later time, in order to -/// reclaim the IO object, and additional related pieces. -#[derive(Debug)] -pub struct Parts { - /// The original IO object used in the handshake. - pub io: T, - /// A buffer of bytes that have been read but not processed as HTTP. - /// - /// For instance, if the `Connection` is used for an HTTP upgrade request, - /// it is possible the server sent back the first bytes of the new protocol - /// along with the response upgrade. - /// - /// You will want to check for any existing bytes if you plan to continue - /// communicating on the IO object. - pub read_buf: Bytes, - _inner: (), -} - -// ===== impl SendRequest - -impl SendRequest { - /// Polls to determine whether this sender can be used yet for a request. - /// - /// If the associated connection is closed, this returns an Error. - pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.dispatch.poll_ready(cx) - } -} - -impl SendRequest -where - B: HttpBody + 'static, -{ - /// Sends a `Request` on the associated connection. - /// - /// Returns a future that if successful, yields the `Response`. - /// - /// # Note - /// - /// There are some key differences in what automatic things the `Client` - /// does for you that will not be done here: - /// - /// - `Client` requires absolute-form `Uri`s, since the scheme and - /// authority are needed to connect. They aren't required here. - /// - Since the `Client` requires absolute-form `Uri`s, it can add - /// the `Host` header based on it. You must add a `Host` header yourself - /// before calling this method. - /// - Since absolute-form `Uri`s are not required, if received, they will - /// be serialized as-is. - /// - /// # Example - /// - /// ``` - /// # use http::header::HOST; - /// # use hyper::client::conn::SendRequest; - /// # use hyper::Body; - /// use hyper::Request; - /// - /// # async fn doc(mut tx: SendRequest) -> hyper::Result<()> { - /// // build a Request - /// let req = Request::builder() - /// .uri("/foo/bar") - /// .header(HOST, "hyper.rs") - /// .body(Body::empty()) - /// .unwrap(); - /// - /// // send it and await a Response - /// let res = tx.send_request(req).await?; - /// // assert the Response - /// assert!(res.status().is_success()); - /// # Ok(()) - /// # } - /// # fn main() {} - /// ``` - pub fn send_request(&mut self, req: Request) -> ResponseFuture { - let inner = match self.dispatch.send(req) { - Ok(rx) => ResponseFutureState::Waiting(rx), - Err(_req) => { - debug!("connection was not ready"); - let err = crate::Error::new_canceled().with("connection was not ready"); - ResponseFutureState::Error(Some(err)) - } - }; - - ResponseFuture { inner } - } -} - -impl Service> for SendRequest -where - B: HttpBody + 'static, -{ - type Response = Response; - type Error = crate::Error; - type Future = ResponseFuture; - - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.send_request(req) - } -} - -impl fmt::Debug for SendRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SendRequest").finish() - } -} - -// ===== impl Connection - -impl Connection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: HttpBody + Unpin + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - /// Return the inner IO object, and additional information. - /// - /// Only works for HTTP/1 connections. HTTP/2 connections will panic. - pub fn into_parts(self) -> Parts { - match self.inner.expect("already upgraded") { - #[cfg(feature = "http1")] - ProtoClient::H1 { h1 } => { - let (io, read_buf, _) = h1.into_inner(); - Parts { - io, - read_buf, - _inner: (), - } - } - ProtoClient::H2 { .. } => { - panic!("http2 cannot into_inner"); - } - - #[cfg(not(feature = "http1"))] - ProtoClient::H1 { h1 } => match h1.0 {}, - } - } - - /// Poll the connection for completion, but without calling `shutdown` - /// on the underlying IO. - /// - /// This is useful to allow running a connection while doing an HTTP - /// upgrade. Once the upgrade is completed, the connection would be "done", - /// but it is not desired to actually shutdown the IO object. Instead you - /// would take it back using `into_parts`. - /// - /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) - /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) - /// to work with this function; or use the `without_shutdown` wrapper. - pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { - match *self.inner.as_mut().expect("already upgraded") { - #[cfg(feature = "http1")] - ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx), - #[cfg(feature = "http2")] - ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()), - - #[cfg(not(feature = "http1"))] - ProtoClient::H1 { ref mut h1 } => match h1.0 {}, - #[cfg(not(feature = "http2"))] - ProtoClient::H2 { ref mut h2, .. } => match h2.0 {}, - } - } - - /// Prevent shutdown of the underlying IO object at the end of service the request, - /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. - pub fn without_shutdown(self) -> impl Future>> { - let mut conn = Some(self); - future::poll_fn(move |cx| -> Poll>> { - ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; - Poll::Ready(Ok(conn.take().unwrap().into_parts())) - }) - } - - /// Returns whether the [extended CONNECT protocol][1] is enabled or not. - /// - /// This setting is configured by the server peer by sending the - /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. - /// This method returns the currently acknowledged value received from the - /// remote. - /// - /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 - /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 - #[cfg(feature = "http2")] - pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool { - match self.inner.as_ref().unwrap() { - ProtoClient::H1 { .. } => false, - ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(), - } - } -} - -impl Future for Connection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: HttpBody + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - type Output = crate::Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? { - proto::Dispatched::Shutdown => Poll::Ready(Ok(())), - #[cfg(feature = "http1")] - proto::Dispatched::Upgrade(pending) => match self.inner.take() { - Some(ProtoClient::H1 { h1 }) => { - let (io, buf, _) = h1.into_inner(); - pending.fulfill(Upgraded::new(io, buf)); - Poll::Ready(Ok(())) - } - _ => { - drop(pending); - unreachable!("Upgrade expects h1"); - } - }, - } - } -} - -impl fmt::Debug for Connection -where - T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static, - B: HttpBody + 'static, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Connection").finish() - } -} - -// ===== impl Builder - -impl Builder { - /// Creates a new connection builder. - #[inline] - pub fn new() -> Builder { - Builder { - exec: Exec::Default, - h09_responses: false, - h1_writev: None, - h1_read_buf_exact_size: None, - h1_parser_config: Default::default(), - h1_title_case_headers: false, - h1_preserve_header_case: false, - #[cfg(feature = "ffi")] - h1_preserve_header_order: false, - h1_max_buf_size: None, - #[cfg(feature = "ffi")] - h1_headers_raw: false, - #[cfg(feature = "http2")] - h2_builder: Default::default(), - #[cfg(feature = "http1")] - version: Proto::Http1, - #[cfg(not(feature = "http1"))] - version: Proto::Http2, - } - } - - /// Provide an executor to execute background HTTP2 tasks. - pub fn executor(&mut self, exec: E) -> &mut Builder - where - E: Executor + Send + Sync + 'static, - { - self.exec = Exec::Executor(Arc::new(exec)); - self - } - - /// Set whether HTTP/0.9 responses should be tolerated. - /// - /// Default is false. - pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder { - self.h09_responses = enabled; - self - } - - /// Set whether HTTP/1 connections will accept spaces between header names - /// and the colon that follow them in responses. - /// - /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has - /// to say about it: - /// - /// > No whitespace is allowed between the header field-name and colon. In - /// > the past, differences in the handling of such whitespace have led to - /// > security vulnerabilities in request routing and response handling. A - /// > server MUST reject any received request message that contains - /// > whitespace between a header field-name and colon with a response code - /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a - /// > response message before forwarding the message downstream. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - /// - /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 - pub fn http1_allow_spaces_after_header_name_in_responses( - &mut self, - enabled: bool, - ) -> &mut Builder { - self.h1_parser_config - .allow_spaces_after_header_name_in_responses(enabled); - self - } - - /// Set whether HTTP/1 connections will accept obsolete line folding for - /// header values. - /// - /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when - /// parsing. - /// - /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has - /// to say about it: - /// - /// > A server that receives an obs-fold in a request message that is not - /// > within a message/http container MUST either reject the message by - /// > sending a 400 (Bad Request), preferably with a representation - /// > explaining that obsolete line folding is unacceptable, or replace - /// > each received obs-fold with one or more SP octets prior to - /// > interpreting the field value or forwarding the message downstream. - /// - /// > A proxy or gateway that receives an obs-fold in a response message - /// > that is not within a message/http container MUST either discard the - /// > message and replace it with a 502 (Bad Gateway) response, preferably - /// > with a representation explaining that unacceptable line folding was - /// > received, or replace each received obs-fold with one or more SP - /// > octets prior to interpreting the field value or forwarding the - /// > message downstream. - /// - /// > A user agent that receives an obs-fold in a response message that is - /// > not within a message/http container MUST replace each received - /// > obs-fold with one or more SP octets prior to interpreting the field - /// > value. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - /// - /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 - pub fn http1_allow_obsolete_multiline_headers_in_responses( - &mut self, - enabled: bool, - ) -> &mut Builder { - self.h1_parser_config - .allow_obsolete_multiline_headers_in_responses(enabled); - self - } - - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder { - self.h1_writev = Some(enabled); - self - } - - /// Set whether HTTP/1 connections will write header names as title case at - /// the socket level. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder { - self.h1_title_case_headers = enabled; - self - } - - /// Set whether to support preserving original header cases. - /// - /// Currently, this will record the original cases received, and store them - /// in a private extension on the `Response`. It will also look for and use - /// such an extension in any provided `Request`. - /// - /// Since the relevant extension is still private, there is no way to - /// interact with the original cases. The only effect this can have now is - /// to forward the cases in a proxy-like fashion. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder { - self.h1_preserve_header_case = enabled; - self - } - - /// Set whether to support preserving original header order. - /// - /// Currently, this will record the order in which headers are received, and store this - /// ordering in a private extension on the `Response`. It will also look for and use - /// such an extension in any provided `Request`. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - #[cfg(feature = "ffi")] - pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder { - self.h1_preserve_header_order = enabled; - self - } - - /// Sets the exact size of the read buffer to *always* use. - /// - /// Note that setting this option unsets the `http1_max_buf_size` option. - /// - /// Default is an adaptive read buffer. - pub fn http1_read_buf_exact_size(&mut self, sz: Option) -> &mut Builder { - self.h1_read_buf_exact_size = sz; - self.h1_max_buf_size = None; - self - } - - /// Set the maximum buffer size for the connection. - /// - /// Default is ~400kb. - /// - /// Note that setting this option unsets the `http1_read_exact_buf_size` option. - /// - /// # Panics - /// - /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self { - assert!( - max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, - "the max_buf_size cannot be smaller than the minimum that h1 specifies." - ); - - self.h1_max_buf_size = Some(max); - self.h1_read_buf_exact_size = None; - self - } - - #[cfg(feature = "ffi")] - pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self { - self.h1_headers_raw = enabled; - self - } - - /// Sets whether HTTP2 is required. - /// - /// Default is false. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_only(&mut self, enabled: bool) -> &mut Builder { - if enabled { - self.version = Proto::Http2 - } - self - } - - /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 - /// stream-level flow control. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.adaptive_window = false; - self.h2_builder.initial_stream_window_size = sz; - } - self - } - - /// Sets the max connection-level flow control for HTTP2 - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_connection_window_size( - &mut self, - sz: impl Into>, - ) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.adaptive_window = false; - self.h2_builder.initial_conn_window_size = sz; - } - self - } - - /// Sets whether to use an adaptive flow control. - /// - /// Enabling this will override the limits set in - /// `http2_initial_stream_window_size` and - /// `http2_initial_connection_window_size`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { - use proto::h2::SPEC_WINDOW_SIZE; - - self.h2_builder.adaptive_window = enabled; - if enabled { - self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; - self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; - } - self - } - - /// Sets the maximum frame size to use for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.max_frame_size = sz; - } - self - } - - /// Sets an interval for HTTP2 Ping frames should be sent to keep a - /// connection alive. - /// - /// Pass `None` to disable HTTP2 keep-alive. - /// - /// Default is currently disabled. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(feature = "runtime")] - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_interval( - &mut self, - interval: impl Into>, - ) -> &mut Self { - self.h2_builder.keep_alive_interval = interval.into(); - self - } - - /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. - /// - /// If the ping is not acknowledged within the timeout, the connection will - /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. - /// - /// Default is 20 seconds. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(feature = "runtime")] - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.h2_builder.keep_alive_timeout = timeout; - self - } - - /// Sets whether HTTP2 keep-alive should apply while the connection is idle. - /// - /// If disabled, keep-alive pings are only sent while there are open - /// request/responses streams. If enabled, pings are also sent when no - /// streams are active. Does nothing if `http2_keep_alive_interval` is - /// disabled. - /// - /// Default is `false`. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(feature = "runtime")] - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self { - self.h2_builder.keep_alive_while_idle = enabled; - self - } - - /// Sets the maximum number of HTTP2 concurrent locally reset streams. - /// - /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more - /// details. - /// - /// The default value is determined by the `h2` crate. - /// - /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { - self.h2_builder.max_concurrent_reset_streams = Some(max); - self - } - - /// Set the maximum write buffer size for each HTTP/2 stream. - /// - /// Default is currently 1MB, but may change. - /// - /// # Panics - /// - /// The value must be no larger than `u32::MAX`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { - assert!(max <= std::u32::MAX as usize); - self.h2_builder.max_send_buffer_size = max; - self - } - - /// Constructs a connection with the configured options and IO. - /// See [`client::conn`](crate::client::conn) for more. - /// - /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will - /// do nothing. - pub fn handshake( - &self, - io: T, - ) -> impl Future, Connection)>> - where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: HttpBody + 'static, - B::Data: Send, - B::Error: Into>, - { - let opts = self.clone(); - - async move { - trace!("client handshake {:?}", opts.version); - - let (tx, rx) = dispatch::channel(); - let proto = match opts.version { - #[cfg(feature = "http1")] - Proto::Http1 => { - let mut conn = proto::Conn::new(io); - conn.set_h1_parser_config(opts.h1_parser_config); - if let Some(writev) = opts.h1_writev { - if writev { - conn.set_write_strategy_queue(); - } else { - conn.set_write_strategy_flatten(); - } - } - if opts.h1_title_case_headers { - conn.set_title_case_headers(); - } - if opts.h1_preserve_header_case { - conn.set_preserve_header_case(); - } - #[cfg(feature = "ffi")] - if opts.h1_preserve_header_order { - conn.set_preserve_header_order(); - } - if opts.h09_responses { - conn.set_h09_responses(); - } - - #[cfg(feature = "ffi")] - conn.set_raw_headers(opts.h1_headers_raw); - - if let Some(sz) = opts.h1_read_buf_exact_size { - conn.set_read_buf_exact_size(sz); - } - if let Some(max) = opts.h1_max_buf_size { - conn.set_max_buf_size(max); - } - let cd = proto::h1::dispatch::Client::new(rx); - let dispatch = proto::h1::Dispatcher::new(cd, conn); - ProtoClient::H1 { h1: dispatch } - } - #[cfg(feature = "http2")] - Proto::Http2 => { - let h2 = - proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone()) - .await?; - ProtoClient::H2 { h2 } - } - }; - - Ok(( - SendRequest { dispatch: tx }, - Connection { inner: Some(proto) }, - )) - } - } -} - -// ===== impl ResponseFuture - -impl Future for ResponseFuture { - type Output = crate::Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match self.inner { - ResponseFutureState::Waiting(ref mut rx) => { - Pin::new(rx).poll(cx).map(|res| match res { - Ok(Ok(resp)) => Ok(resp), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_canceled) => panic!("dispatch dropped without returning error"), - }) - } - ResponseFutureState::Error(ref mut err) => { - Poll::Ready(Err(err.take().expect("polled after ready"))) - } - } - } -} - -impl fmt::Debug for ResponseFuture { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ResponseFuture").finish() - } -} - -// ===== impl ProtoClient - -impl Future for ProtoClient -where - T: AsyncRead + AsyncWrite + Send + Unpin + 'static, - B: HttpBody + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - type Output = crate::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match self.project() { - #[cfg(feature = "http1")] - ProtoClientProj::H1 { h1 } => h1.poll(cx), - #[cfg(feature = "http2")] - ProtoClientProj::H2 { h2, .. } => h2.poll(cx), - - #[cfg(not(feature = "http1"))] - ProtoClientProj::H1 { h1 } => match h1.0 {}, - #[cfg(not(feature = "http2"))] - ProtoClientProj::H2 { h2, .. } => match h2.0 {}, - } - } -} - -// assert trait markers - -trait AssertSend: Send {} -trait AssertSendSync: Send + Sync {} - -#[doc(hidden)] -impl AssertSendSync for SendRequest {} - -#[doc(hidden)] -impl AssertSend for Connection -where - T: AsyncRead + AsyncWrite + Send + 'static, - B: HttpBody + 'static, - B::Data: Send, -{ -} - -#[doc(hidden)] -impl AssertSendSync for Connection -where - T: AsyncRead + AsyncWrite + Send + 'static, - B: HttpBody + 'static, - B::Data: Send + Sync + 'static, -{ -} - -#[doc(hidden)] -impl AssertSendSync for Builder {} -#[doc(hidden)] -impl AssertSend for ResponseFuture {} diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 013f6fb5a8..4d2e94378e 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -205,14 +205,14 @@ where req_rx: ClientRx, } -impl ClientTask -where - B: HttpBody + 'static, -{ - pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { - self.h2_tx.is_extended_connect_protocol_enabled() - } -} +// impl ClientTask +// where +// B: HttpBody + 'static, +// { +// pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { +// self.h2_tx.is_extended_connect_protocol_enabled() +// } +// } impl Future for ClientTask where diff --git a/tests/client.rs b/tests/client.rs index 5360b71ad5..6d40e0bca3 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -232,19 +232,17 @@ macro_rules! test { // Wrapper around hyper::client::conn::Builder with set_host field to mimic // hyper::client::Builder. struct Builder { - inner: hyper::client::conn::Builder, + inner: hyper::client::conn::http1::Builder, set_host: bool, http09_responses: bool, - http2_only: bool, } impl Builder { fn new() -> Self { Self { - inner: hyper::client::conn::Builder::new(), + inner: hyper::client::conn::http1::Builder::new(), set_host: true, http09_responses: false, - http2_only: false, } } @@ -260,17 +258,10 @@ macro_rules! test { self.inner.http09_responses(val); self } - - #[allow(unused)] - fn http2_only(&mut self, val: bool) -> &mut Self { - self.http2_only = val; - self.inner.http2_only(val); - self - } } impl std::ops::Deref for Builder { - type Target = hyper::client::conn::Builder; + type Target = hyper::client::conn::http1::Builder; fn deref(&self) -> &Self::Target { &self.inner @@ -292,7 +283,7 @@ macro_rules! test { return Err(Error::UnsupportedVersion); } - if req.version() == Version::HTTP_2 && !builder.http2_only { + if req.version() == Version::HTTP_2 { return Err(Error::UnsupportedVersion); } @@ -1371,7 +1362,7 @@ mod conn { let client = async move { let tcp = tcp_connect(&addr).await.expect("connect"); - let (mut client, conn) = conn::handshake(tcp).await.expect("handshake"); + let (mut client, conn) = conn::http1::handshake(tcp).await.expect("handshake"); tokio::task::spawn(async move { conn.await.expect("http conn"); @@ -1415,7 +1406,7 @@ mod conn { let client = async move { let tcp = tcp_connect(&addr).await.expect("connect"); - let (mut client, conn) = conn::handshake(tcp).await.expect("handshake"); + let (mut client, conn) = conn::http1::handshake(tcp).await.expect("handshake"); tokio::task::spawn(async move { conn.await.expect("http conn"); @@ -1473,7 +1464,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1519,7 +1510,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1570,7 +1561,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1615,7 +1606,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1657,7 +1648,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1727,7 +1718,7 @@ mod conn { shutdown_called: false, }; - let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap(); + let (mut client, mut conn) = rt.block_on(conn::http1::handshake(io)).unwrap(); { let until_upgrade = poll_fn(|ctx| conn.poll_without_shutdown(ctx)); @@ -1813,7 +1804,7 @@ mod conn { shutdown_called: false, }; - let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap(); + let (mut client, mut conn) = rt.block_on(conn::http1::handshake(io)).unwrap(); { let until_tunneled = poll_fn(|ctx| conn.poll_without_shutdown(ctx)); @@ -1911,8 +1902,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() - .http2_only(true) + let (mut client, conn) = conn::http2::Builder::new() .handshake::<_, Body>(io) .await .expect("http handshake"); @@ -1973,8 +1963,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (_client, conn) = conn::Builder::new() - .http2_only(true) + let (_client, conn) = conn::http2::Builder::new() .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) // enable while idle since we aren't sending requests @@ -2006,8 +1995,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() - .http2_only(true) + let (mut client, conn) = conn::http2::Builder::new() .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .handshake::<_, Body>(io) @@ -2042,8 +2030,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() - .http2_only(true) + let (mut client, conn) = conn::http2::Builder::new() .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .handshake::<_, Body>(io) @@ -2106,8 +2093,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() - .http2_only(true) + let (mut client, conn) = conn::http2::Builder::new() .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .handshake::<_, Body>(io) @@ -2165,8 +2151,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() - .http2_only(true) + let (mut client, conn) = conn::http2::Builder::new() .handshake::<_, Body>(io) .await .expect("http handshake"); @@ -2221,8 +2206,7 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() - .http2_only(true) + let (mut client, conn) = conn::http2::Builder::new() .handshake::<_, Body>(io) .await .expect("http handshake"); diff --git a/tests/server.rs b/tests/server.rs index 40e01b1d1e..086c60ab2a 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -2516,8 +2516,7 @@ async fn http2_keep_alive_with_responsive_client() { }); let tcp = connect_async(addr).await; - let (mut client, conn) = hyper::client::conn::Builder::new() - .http2_only(true) + let (mut client, conn) = hyper::client::conn::http2::Builder::new() .handshake::<_, Body>(tcp) .await .expect("http handshake"); @@ -3142,19 +3141,24 @@ impl TestClient { let host = req.uri().host().expect("uri has no host"); let port = req.uri().port_u16().expect("uri has no port"); - let mut builder = hyper::client::conn::Builder::new(); - builder.http2_only(self.http2_only); - let stream = TkTcpStream::connect(format!("{}:{}", host, port)) .await .unwrap(); - let (mut sender, conn) = builder.handshake(stream).await.unwrap(); + if self.http2_only { + let (mut sender, conn) = hyper::client::conn::http2::handshake(stream).await.unwrap(); + tokio::task::spawn(async move { + conn.await.unwrap(); + }); - tokio::task::spawn(async move { - conn.await.unwrap(); - }); + sender.send_request(req).await + } else { + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await.unwrap(); + tokio::task::spawn(async move { + conn.await.unwrap(); + }); - sender.send_request(req).await + sender.send_request(req).await + } } } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 781e71baee..dde7c5ff57 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -6,7 +6,6 @@ use std::sync::{ Arc, Mutex, }; -use hyper::client::conn::Builder; use hyper::server::conn::Http; use tokio::net::{TcpListener, TcpStream}; @@ -415,19 +414,29 @@ async fn async_test(cfg: __TestConfig) { async move { let stream = TcpStream::connect(addr).await.unwrap(); - let (mut sender, conn) = hyper::client::conn::Builder::new() - .http2_only(http2_only) - .handshake::(stream) - .await - .unwrap(); + let res = if http2_only { + let (mut sender, conn) = hyper::client::conn::http2::handshake::(stream) + .await + .unwrap(); - tokio::task::spawn(async move { - if let Err(err) = conn.await { - panic!("{:?}", err); - } - }); + tokio::task::spawn(async move { + if let Err(err) = conn.await { + panic!("{:?}", err); + } + }); + sender.send_request(req).await.unwrap() + } else { + let (mut sender, conn) = hyper::client::conn::http1::handshake::(stream) + .await + .unwrap(); - let res = sender.send_request(req).await.unwrap(); + tokio::task::spawn(async move { + if let Err(err) = conn.await { + panic!("{:?}", err); + } + }); + sender.send_request(req).await.unwrap() + }; assert_eq!(res.status(), cstatus, "server status"); assert_eq!(res.version(), version, "server version"); @@ -501,17 +510,29 @@ async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) .await .unwrap(); - let mut builder = Builder::new(); - builder.http2_only(http2_only); - let (mut sender, conn) = builder.handshake(stream).await.unwrap(); - - tokio::task::spawn(async move { - if let Err(err) = conn.await { - panic!("{:?}", err); - } - }); - - let resp = sender.send_request(req).await?; + let resp = if http2_only { + let builder = hyper::client::conn::http2::Builder::new(); + let (mut sender, conn) = builder.handshake(stream).await.unwrap(); + + tokio::task::spawn(async move { + if let Err(err) = conn.await { + panic!("{:?}", err); + } + }); + + sender.send_request(req).await? + } else { + let builder = hyper::client::conn::http1::Builder::new(); + let (mut sender, conn) = builder.handshake(stream).await.unwrap(); + + tokio::task::spawn(async move { + if let Err(err) = conn.await { + panic!("{:?}", err); + } + }); + + sender.send_request(req).await? + }; let (mut parts, body) = resp.into_parts();