Upgrade to hyper v0.13

This commit is contained in:
Glenn Griffin 2019-12-10 13:17:46 -08:00
parent cf0c820da3
commit 6c7daac551
5 changed files with 36 additions and 33 deletions

View File

@ -7,11 +7,11 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
hyper = {version = "=0.13.0-alpha.4", features = ["unstable-stream"]} hyper = "0.13"
futures-preview = {version = "=0.3.0-alpha.19", features = ["std", "async-await"]} futures = "0.3"
tokio = "=0.2.0-alpha.6" tokio = { version = "0.2", features = ["macros"] }
crossbeam-channel = "0.4.0" crossbeam-channel = "0.4.0"
http = "0.1.18" http = "0.2"
log = "0.4.8" log = "0.4.8"
bstr = "0.2.8" bstr = "0.2.8"
regex = "1.3.1" regex = "1.3.1"

View File

@ -9,10 +9,8 @@ use std::fmt;
// import the any_of and all_of macros from crate root so they are accessible if // import the any_of and all_of macros from crate root so they are accessible if
// people glob import this module. // people glob import this module.
/// Accept a list of matchers and returns true if all matchers are true.
#[doc(inline)] #[doc(inline)]
pub use crate::all_of; pub use crate::all_of;
/// Accept a list of matchers and returns true if any matcher is true.
#[doc(inline)] #[doc(inline)]
pub use crate::any_of; pub use crate::any_of;
@ -412,7 +410,7 @@ mod tests {
("key2".to_owned(), "".to_owned()), ("key2".to_owned(), "".to_owned()),
]; ];
let mut c = request::query(url_decoded(eq(expected))); let mut c = request::query(url_decoded(eq(expected)));
let req = http::Request::get("https://example.com/path?key%201=value%201&key2") let req = hyper::Request::get("https://example.com/path?key%201=value%201&key2")
.body("") .body("")
.unwrap(); .unwrap();

View File

@ -13,7 +13,7 @@ pub use crate::cycle;
/// Respond with an HTTP response. /// Respond with an HTTP response.
pub trait Responder: Send + fmt::Debug { pub trait Responder: Send + fmt::Debug {
/// Return a future that outputs an HTTP response. /// Return a future that outputs an HTTP response.
fn respond(&mut self) -> Pin<Box<dyn Future<Output = http::Response<hyper::Body>> + Send>>; fn respond(&mut self) -> Pin<Box<dyn Future<Output = hyper::Response<hyper::Body>> + Send>>;
} }
/// respond with the provided status code. /// respond with the provided status code.
@ -24,8 +24,8 @@ pub fn status_code(code: u16) -> impl Responder {
#[derive(Debug)] #[derive(Debug)]
pub struct StatusCode(u16); pub struct StatusCode(u16);
impl Responder for StatusCode { impl Responder for StatusCode {
fn respond(&mut self) -> Pin<Box<dyn Future<Output = http::Response<hyper::Body>> + Send>> { fn respond(&mut self) -> Pin<Box<dyn Future<Output = hyper::Response<hyper::Body>> + Send>> {
async fn _respond(status_code: u16) -> http::Response<hyper::Body> { async fn _respond(status_code: u16) -> hyper::Response<hyper::Body> {
hyper::Response::builder() hyper::Response::builder()
.status(status_code) .status(status_code)
.body(hyper::Body::empty()) .body(hyper::Body::empty())
@ -49,8 +49,8 @@ where
#[derive(Debug)] #[derive(Debug)]
pub struct JsonEncoded(String); pub struct JsonEncoded(String);
impl Responder for JsonEncoded { impl Responder for JsonEncoded {
fn respond(&mut self) -> Pin<Box<dyn Future<Output = http::Response<hyper::Body>> + Send>> { fn respond(&mut self) -> Pin<Box<dyn Future<Output = hyper::Response<hyper::Body>> + Send>> {
async fn _respond(body: String) -> http::Response<hyper::Body> { async fn _respond(body: String) -> hyper::Response<hyper::Body> {
hyper::Response::builder() hyper::Response::builder()
.status(200) .status(200)
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
@ -75,8 +75,8 @@ where
#[derive(Debug)] #[derive(Debug)]
pub struct UrlEncoded(String); pub struct UrlEncoded(String);
impl Responder for UrlEncoded { impl Responder for UrlEncoded {
fn respond(&mut self) -> Pin<Box<dyn Future<Output = http::Response<hyper::Body>> + Send>> { fn respond(&mut self) -> Pin<Box<dyn Future<Output = hyper::Response<hyper::Body>> + Send>> {
async fn _respond(body: String) -> http::Response<hyper::Body> { async fn _respond(body: String) -> hyper::Response<hyper::Body> {
hyper::Response::builder() hyper::Response::builder()
.status(200) .status(200)
.header("Content-Type", "application/x-www-form-urlencoded") .header("Content-Type", "application/x-www-form-urlencoded")
@ -91,13 +91,13 @@ impl<B> Responder for hyper::Response<B>
where where
B: Clone + Into<hyper::Body> + Send + fmt::Debug, B: Clone + Into<hyper::Body> + Send + fmt::Debug,
{ {
fn respond(&mut self) -> Pin<Box<dyn Future<Output = http::Response<hyper::Body>> + Send>> { fn respond(&mut self) -> Pin<Box<dyn Future<Output = hyper::Response<hyper::Body>> + Send>> {
async fn _respond(resp: http::Response<hyper::Body>) -> http::Response<hyper::Body> { async fn _respond(resp: hyper::Response<hyper::Body>) -> hyper::Response<hyper::Body> {
resp resp
} }
// Turn &hyper::Response<Vec<u8>> into a hyper::Response<hyper::Body> // Turn &hyper::Response<Vec<u8>> into a hyper::Response<hyper::Body>
let mut builder = hyper::Response::builder(); let mut builder = hyper::Response::builder();
builder builder = builder
.status(self.status().clone()) .status(self.status().clone())
.version(self.version().clone()); .version(self.version().clone());
*builder.headers_mut().unwrap() = self.headers().clone(); *builder.headers_mut().unwrap() = self.headers().clone();
@ -121,7 +121,7 @@ pub struct Cycle {
responders: Vec<Box<dyn Responder>>, responders: Vec<Box<dyn Responder>>,
} }
impl Responder for Cycle { impl Responder for Cycle {
fn respond(&mut self) -> Pin<Box<dyn Future<Output = http::Response<hyper::Body>> + Send>> { fn respond(&mut self) -> Pin<Box<dyn Future<Output = hyper::Response<hyper::Body>> + Send>> {
let response = self.responders[self.idx].respond(); let response = self.responders[self.idx].respond();
self.idx = (self.idx + 1) % self.responders.len(); self.idx = (self.idx + 1) % self.responders.len();
response response

View File

@ -43,8 +43,7 @@ impl Server {
async move { async move {
// read the full body into memory prior to handing it to mappers. // read the full body into memory prior to handing it to mappers.
let (head, body) = req.into_parts(); let (head, body) = req.into_parts();
use futures::TryStreamExt; let full_body = hyper::body::to_bytes(body).await?;
let full_body = body.try_concat().await?;
let req = hyper::Request::from_parts(head, full_body.to_vec()); let req = hyper::Request::from_parts(head, full_body.to_vec());
log::debug!("Received Request: {:?}", req); log::debug!("Received Request: {:?}", req);
let resp = on_req(state, req).await; let resp = on_req(state, req).await;
@ -61,7 +60,11 @@ impl Server {
let addr = server.local_addr(); let addr = server.local_addr();
let (trigger_shutdown, shutdown_received) = futures::channel::oneshot::channel(); let (trigger_shutdown, shutdown_received) = futures::channel::oneshot::channel();
let join_handle = std::thread::spawn(move || { let join_handle = std::thread::spawn(move || {
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move { runtime.block_on(async move {
futures::select! { futures::select! {
_ = server.fuse() => {}, _ = server.fuse() => {},
@ -88,11 +91,12 @@ impl Server {
/// If the server is listening on port 1234. /// If the server is listening on port 1234.
/// ///
/// `server.url("/foo?q=1") == "http://localhost:1234/foo?q=1"` /// `server.url("/foo?q=1") == "http://localhost:1234/foo?q=1"`
pub fn url<T>(&self, path_and_query: T) -> http::Uri pub fn url<T>(&self, path_and_query: T) -> hyper::Uri
where where
http::uri::PathAndQuery: http::HttpTryFrom<T>, http::uri::PathAndQuery: std::convert::TryFrom<T>,
<http::uri::PathAndQuery as std::convert::TryFrom<T>>::Error: Into<http::Error>,
{ {
http::Uri::builder() hyper::Uri::builder()
.scheme("http") .scheme("http")
.authority(format!("{}", &self.addr).as_str()) .authority(format!("{}", &self.addr).as_str())
.path_and_query(path_and_query) .path_and_query(path_and_query)
@ -156,7 +160,7 @@ impl Drop for Server {
} }
} }
async fn on_req(state: ServerState, req: FullRequest) -> http::Response<hyper::Body> { async fn on_req(state: ServerState, req: FullRequest) -> hyper::Response<hyper::Body> {
let response_future = { let response_future = {
let mut state = state.lock(); let mut state = state.lock();
let mut iter = state.expected.iter_mut(); let mut iter = state.expected.iter_mut();
@ -198,8 +202,8 @@ async fn on_req(state: ServerState, req: FullRequest) -> http::Response<hyper::B
if let Some(f) = response_future { if let Some(f) = response_future {
f.await f.await
} else { } else {
http::Response::builder() hyper::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR) .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.body(hyper::Body::empty()) .body(hyper::Body::empty())
.unwrap() .unwrap()
} }
@ -305,14 +309,14 @@ fn cardinality_error(
matcher: &dyn Matcher<FullRequest>, matcher: &dyn Matcher<FullRequest>,
cardinality: &Times, cardinality: &Times,
hit_count: usize, hit_count: usize,
) -> Pin<Box<dyn Future<Output = http::Response<hyper::Body>> + Send + 'static>> { ) -> Pin<Box<dyn Future<Output = hyper::Response<hyper::Body>> + Send + 'static>> {
let body = hyper::Body::from(format!( let body = hyper::Body::from(format!(
"Unexpected number of requests for matcher '{:?}'; received {}; expected {:?}", "Unexpected number of requests for matcher '{:?}'; received {}; expected {:?}",
matcher, hit_count, cardinality, matcher, hit_count, cardinality,
)); ));
Box::pin(async move { Box::pin(async move {
http::Response::builder() hyper::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR) .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.body(body) .body(body)
.unwrap() .unwrap()
}) })

View File

@ -1,9 +1,10 @@
use httptest::{mappers::*, responders::*, Expectation, Times}; use httptest::{mappers::*, responders::*, Expectation, Times};
async fn read_response_body(resp: hyper::Response<hyper::Body>) -> hyper::Response<Vec<u8>> { async fn read_response_body(
use futures::stream::TryStreamExt; resp: hyper::Response<hyper::Body>,
) -> hyper::Response<hyper::body::Bytes> {
let (head, body) = resp.into_parts(); let (head, body) = resp.into_parts();
let body = body.try_concat().await.unwrap().to_vec(); let body = hyper::body::to_bytes(body).await.unwrap();
hyper::Response::from_parts(head, body) hyper::Response::from_parts(head, body)
} }