Skip to content

Commit a745cf5

Browse files
authored
Merge pull request #3 from jtgeibel/tokio-threadpool
Switch from `futures-cpupool` to `tokio-threadpool`
2 parents 56bdfc9 + 33d8334 commit a745cf5

File tree

4 files changed

+223
-54
lines changed

4 files changed

+223
-54
lines changed

Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "conduit-hyper"
3-
version = "0.1.3"
3+
version = "0.2.0-a.0"
44
authors = ["Justin Geibel <[email protected]>"]
55
license = "MIT OR Apache-2.0"
66
description = "Host a conduit based web application on a hyper server"
@@ -11,8 +11,12 @@ edition = "2018"
1111
[dependencies]
1212
conduit = "0.8"
1313
futures = "0.1"
14-
futures-cpupool = "0.1"
1514
hyper = "0.12"
1615
http = "0.1"
1716
log = "0.4"
1817
semver = "0.5" # Must match version in conduit for now
18+
tokio-threadpool = "0.1.12"
19+
20+
[dev-dependencies]
21+
conduit-router = "0.8"
22+
tokio = "0.1"

examples/server.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#![deny(warnings, clippy::all)]
2+
3+
use conduit::{Handler, Request, Response};
4+
use conduit_hyper::Server;
5+
use conduit_router::RouteBuilder;
6+
use futures::Future;
7+
use tokio::runtime;
8+
9+
use std::collections::HashMap;
10+
use std::io::{Cursor, Error};
11+
use std::thread::sleep;
12+
13+
fn main() {
14+
let app = build_conduit_handler();
15+
let addr = ([127, 0, 0, 1], 12345).into();
16+
let server = Server::bind(&addr, app).map_err(|e| {
17+
eprintln!("server error: {}", e);
18+
});
19+
20+
let mut rt = runtime::Builder::new()
21+
// Set the max number of concurrent requests (tokio defaults to 100)
22+
.blocking_threads(2)
23+
.build()
24+
.unwrap();
25+
rt.spawn(server);
26+
rt.shutdown_on_idle().wait().unwrap();
27+
}
28+
29+
fn build_conduit_handler() -> impl Handler {
30+
let mut router = RouteBuilder::new();
31+
router.get("/", endpoint);
32+
router.get("/panic", panic);
33+
router
34+
}
35+
36+
fn endpoint(_: &mut dyn Request) -> Result<Response, Error> {
37+
let body = "Hello world!";
38+
39+
sleep(std::time::Duration::from_secs(2));
40+
41+
let mut headers = HashMap::new();
42+
headers.insert(
43+
"Content-Type".to_string(),
44+
vec!["text/plain; charset=utf-8".to_string()],
45+
);
46+
headers.insert("Content-Length".to_string(), vec![body.len().to_string()]);
47+
Ok(Response {
48+
status: (200, "OK"),
49+
headers,
50+
body: Box::new(Cursor::new(body)),
51+
})
52+
}
53+
54+
fn panic(_: &mut dyn Request) -> Result<Response, Error> {
55+
// For now, connection is immediately closed
56+
panic!("message");
57+
}

src/lib.rs

Lines changed: 108 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,57 @@
11
#![deny(warnings, clippy::all, missing_debug_implementations)]
22

3+
//! A wrapper for integrating `hyper 0.12` with a `conduit 0.8` blocking application stack.
4+
//!
5+
//! A `conduit::Handler` is allowed to block so the `Server` must be spawned on the (default)
6+
//! multi-threaded `Runtime` which allows (by default) 100 concurrent blocking threads. Any excess
7+
//! requests will asynchronously wait for an available blocking thread.
8+
//!
9+
//! # Examples
10+
//!
11+
//! Try out the example with `cargo run --example server`.
12+
//!
13+
//! Typical usage:
14+
//!
15+
//! ```no_run
16+
//! use conduit::Handler;
17+
//! use conduit_hyper::Server;
18+
//! use futures::Future;
19+
//! use tokio::runtime::Runtime;
20+
//!
21+
//! fn main() {
22+
//! let app = build_conduit_handler();
23+
//! let addr = ([127, 0, 0, 1], 12345).into();
24+
//! let server = Server::bind(&addr, app).map_err(|e| {
25+
//! eprintln!("server error: {}", e);
26+
//! });
27+
//!
28+
//! let mut rt = Runtime::new().unwrap();
29+
//! rt.spawn(server);
30+
//! rt.shutdown_on_idle().wait().unwrap();
31+
//! }
32+
//!
33+
//! fn build_conduit_handler() -> impl Handler {
34+
//! // ...
35+
//! # Endpoint()
36+
//! }
37+
//! #
38+
//! # use std::{collections, error, io};
39+
//! #
40+
//! # use conduit::{Request, Response};
41+
//! #
42+
//! # struct Endpoint();
43+
//! #
44+
//! # impl Handler for Endpoint {
45+
//! # fn call(&self, _: &mut dyn Request) -> Result<Response, Box<dyn error::Error + Send>> {
46+
//! # Ok(Response {
47+
//! # status: (200, "OK"),
48+
//! # headers: collections::HashMap::new(),
49+
//! # body: Box::new(io::Cursor::new("")),
50+
//! # })
51+
//! # }
52+
//! # }
53+
//! ```
54+
355
#[cfg(test)]
456
mod tests;
557

@@ -9,13 +61,27 @@ use std::path::{Component, Path, PathBuf};
961
use std::sync::Arc;
1062

1163
use futures::{future, Future, Stream};
12-
use futures_cpupool::CpuPool;
13-
use hyper::{Body, Chunk, Method, Request, Response, Server, StatusCode, Version};
64+
use hyper::{Body, Chunk, Method, Request, Response, StatusCode, Version};
1465
use log::error;
1566

1667
// Consumers of this library need access to this particular version of `semver`
1768
pub use semver;
1869

70+
/// A builder for a `hyper::Server`
71+
#[derive(Debug)]
72+
pub struct Server;
73+
74+
impl Server {
75+
/// Bind a handler to an address
76+
pub fn bind<H: conduit::Handler>(
77+
addr: &SocketAddr,
78+
handler: H,
79+
) -> hyper::Server<hyper::server::conn::AddrIncoming, Service<H>> {
80+
let service = Service::new(handler);
81+
hyper::Server::bind(&addr).serve(service)
82+
}
83+
}
84+
1985
#[derive(Debug)]
2086
struct Parts(http::request::Parts);
2187

@@ -67,7 +133,7 @@ struct ConduitRequest {
67133
parts: Parts,
68134
path: String,
69135
body: Cursor<Chunk>,
70-
extensions: conduit::Extensions,
136+
extensions: conduit::Extensions, // makes struct non-Send
71137
}
72138

73139
impl conduit::Request for ConduitRequest {
@@ -157,8 +223,34 @@ impl conduit::Request for ConduitRequest {
157223
}
158224
}
159225

226+
/// Owned data consumed by the worker thread
227+
///
228+
/// `ConduitRequest` cannot be sent between threads, so the input data is
229+
/// captured on a core thread and taken by the worker thread.
230+
struct RequestInfo(Option<(Parts, Chunk)>);
231+
232+
impl RequestInfo {
233+
/// Save the request info that can be sent between threads
234+
fn new(parts: http::request::Parts, body: Chunk) -> Self {
235+
let tuple = (Parts(parts), body);
236+
Self(Some(tuple))
237+
}
238+
239+
/// Take back the request info
240+
///
241+
/// Call this from the worker thread to obtain ownership of the `Send` data
242+
///
243+
/// # Panics
244+
///
245+
/// Panics if called more than once on a value
246+
fn take(&mut self) -> (Parts, Chunk) {
247+
self.0.take().expect("called take multiple times")
248+
}
249+
}
250+
160251
impl ConduitRequest {
161-
fn new(parts: Parts, body: Chunk) -> ConduitRequest {
252+
fn new(info: &mut RequestInfo) -> Self {
253+
let (parts, body) = info.take();
162254
let path = parts.0.uri.path().to_string();
163255
let path = Path::new(&path);
164256
let path = path
@@ -183,7 +275,7 @@ impl ConduitRequest {
183275
.to_string_lossy()
184276
.to_string(); // non-Unicode is replaced with U+FFFD REPLACEMENT CHARACTER
185277

186-
ConduitRequest {
278+
Self {
187279
parts,
188280
path,
189281
body: Cursor::new(body),
@@ -195,15 +287,13 @@ impl ConduitRequest {
195287
/// Serve a `conduit::Handler` on a thread pool
196288
#[derive(Debug)]
197289
pub struct Service<H> {
198-
pool: CpuPool,
199290
handler: Arc<H>,
200291
}
201292

202293
// #[derive(Clone)] results in cloning a ref, and not the Service
203294
impl<H> Clone for Service<H> {
204295
fn clone(&self) -> Self {
205296
Service {
206-
pool: self.pool.clone(),
207297
handler: self.handler.clone(),
208298
}
209299
}
@@ -230,39 +320,32 @@ impl<H: conduit::Handler> hyper::service::Service for Service<H> {
230320

231321
/// Returns a future which buffers the response body and then calls the conduit handler from a thread pool
232322
fn call(&mut self, request: Request<Self::ReqBody>) -> Self::Future {
233-
let pool = self.pool.clone();
234323
let handler = self.handler.clone();
235324

236325
let (parts, body) = request.into_parts();
237326
let response = body.concat2().and_then(move |full_body| {
238-
pool.spawn_fn(move || {
239-
let mut request = ConduitRequest::new(Parts(parts), full_body);
240-
let response = handler
241-
.call(&mut request)
242-
.map(good_response)
243-
.unwrap_or_else(|e| error_response(e.description()));
244-
245-
Ok(response)
327+
let mut request_info = RequestInfo::new(parts, full_body);
328+
future::poll_fn(move || {
329+
tokio_threadpool::blocking(|| {
330+
let mut request = ConduitRequest::new(&mut request_info);
331+
handler
332+
.call(&mut request)
333+
.map(good_response)
334+
.unwrap_or_else(|e| error_response(e.description()))
335+
})
336+
.map_err(|_| panic!("the threadpool shut down"))
246337
})
247338
});
248339
Box::new(response)
249340
}
250341
}
251342

252343
impl<H: conduit::Handler> Service<H> {
253-
/// Create a multi-threaded `Service` from a `Handler`
254-
pub fn new(handler: H, threads: usize) -> Service<H> {
344+
fn new(handler: H) -> Self {
255345
Service {
256-
pool: CpuPool::new(threads),
257346
handler: Arc::new(handler),
258347
}
259348
}
260-
261-
/// Run the `Service` bound to a given `SocketAddr`
262-
pub fn run(&self, addr: SocketAddr) {
263-
let server = Server::bind(&addr).serve(self.clone());
264-
hyper::rt::run(server.map_err(|e| error!("Server error: {}", e)));
265-
}
266349
}
267350

268351
/// Builds a `hyper::Response` given a `conduit:Response`

0 commit comments

Comments
 (0)