Skip to content

Commit 20f73d2

Browse files
committed
fix(transports): fix rebase errors
1 parent f8d1ac9 commit 20f73d2

File tree

1 file changed

+73
-65
lines changed

1 file changed

+73
-65
lines changed

src/transports/http.rs

Lines changed: 73 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -305,45 +305,44 @@ fn id_of_output(output: &Output) -> Result<RequestId> {
305305

306306
#[cfg(test)]
307307
mod tests {
308+
use std::{future::Future, pin::Pin};
309+
308310
use super::*;
309311
use crate::Error::Rpc;
310-
use core::pin::Pin;
311-
use futures::{lock::Mutex, Future};
312-
use hyper::{
313-
body::HttpBody,
314-
service::{make_service_fn, service_fn},
315-
Body, Error, Method, Request, Response, Server,
316-
};
312+
use futures::lock::Mutex;
313+
use http_body_util::{BodyExt, Full};
314+
use hyper::{body::Bytes, body::Incoming, server::conn::http1, service::service_fn, Method, Request, Response};
315+
use hyper_util::rt::TokioIo;
317316
use jsonrpc_core::ErrorCode;
318-
use std::net::TcpListener;
319-
use tokio::{task::JoinHandle, time::Instant};
317+
use tokio::{net::TcpListener, task::JoinHandle, time::Instant};
320318

321-
type HyperResponse = Pin<Box<dyn Future<Output = hyper::Result<Response<Body>>> + Send>>;
319+
type HyperResponse =
320+
Pin<Box<dyn Future<Output = std::result::Result<Response<Full<Bytes>>, hyper::http::Error>> + Send + Sync>>;
322321

323-
type HyperHandler = Box<dyn Fn(Request<Body>) -> HyperResponse + Send + Sync>;
322+
type HyperHandler = Box<dyn Fn(Request<Incoming>) -> HyperResponse + Send + Sync>;
324323

325-
fn get_available_port() -> Option<u16> {
326-
Some(TcpListener::bind(("127.0.0.1", 0)).ok()?.local_addr().ok()?.port())
324+
async fn get_available_port() -> Option<u16> {
325+
Some(
326+
TcpListener::bind(("127.0.0.1", 0))
327+
.await
328+
.ok()?
329+
.local_addr()
330+
.ok()?
331+
.port(),
332+
)
327333
}
328334

329-
fn create_server(port: u16, handler: HyperHandler) -> JoinHandle<()> {
335+
async fn create_server(port: u16, handler: HyperHandler) -> JoinHandle<()> {
330336
let addr = format!("127.0.0.1:{}", port);
337+
let listener = TcpListener::bind(addr).await.unwrap();
331338
let handler = Arc::new(handler);
332-
let service = make_service_fn(move |_| {
333-
let handler = handler.clone();
334-
async move {
335-
let handler = handler.clone();
336-
Ok::<_, Error>(service_fn(move |req| {
337-
let handler = handler.clone();
338-
async move { handler(req).await }
339-
}))
339+
tokio::task::spawn(async move {
340+
loop {
341+
let (stream, _) = listener.accept().await.unwrap();
342+
let service = service_fn(handler.as_ref());
343+
let io = TokioIo::new(stream);
344+
http1::Builder::new().serve_connection(io, service).await.unwrap();
340345
}
341-
});
342-
343-
let server = Server::bind(&addr.parse().unwrap()).serve(service);
344-
tokio::spawn(async move {
345-
println!("Listening on http://{}", addr);
346-
server.await.unwrap();
347346
})
348347
}
349348

@@ -356,9 +355,9 @@ mod tests {
356355
}
357356

358357
fn return_429(retry_after_value: Option<String>) -> HyperHandler {
359-
Box::new(move |_req: Request<Body>| -> HyperResponse {
358+
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
360359
let retry_after_value = retry_after_value.clone();
361-
let response_body = Body::from(
360+
let response_body = Bytes::from(
362361
r#"{
363362
"jsonrpc": "2.0",
364363
"error": {
@@ -373,14 +372,14 @@ mod tests {
373372
response = response.header("Retry-After", value)
374373
}
375374

376-
let response = response.body(response_body).unwrap();
375+
let response = response.body(Full::new(response_body)).unwrap();
377376
Box::pin(async move { Ok(response) })
378377
})
379378
}
380379

381380
fn return_5xx(code: u16) -> HyperHandler {
382-
Box::new(move |_req: Request<Body>| -> HyperResponse {
383-
let response_body = Body::from(
381+
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
382+
let response_body = Bytes::from(
384383
r#"{
385384
"jsonrpc": "2.0",
386385
"error": {
@@ -390,12 +389,12 @@ mod tests {
390389
}"#,
391390
);
392391

393-
let response = Response::builder().status(code).body(response_body).unwrap();
392+
let response = Response::builder().status(code).body(Full::new(response_body)).unwrap();
394393
Box::pin(async move { Ok(response) })
395394
})
396395
}
397396

398-
fn check_and_return_mock_response(req: Request<Body>) -> HyperResponse {
397+
fn check_and_return_mock_response(req: Request<Incoming>) -> HyperResponse {
399398
let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":0}"#;
400399
let response = r#"{"jsonrpc":"2.0","id":0,"result":"x"}"#;
401400

@@ -405,16 +404,15 @@ mod tests {
405404
let mut body = req.into_body();
406405

407406
Box::pin(async move {
408-
while let Some(Ok(chunk)) = body.data().await {
409-
content.extend(&*chunk);
407+
while let Some(Ok(chunk)) = body.frame().await {
408+
content.extend(chunk.into_data().unwrap());
410409
}
411-
assert_eq!(std::str::from_utf8(&*content), Ok(expected));
412-
413-
Ok(Response::new(response.into()))
410+
assert_eq!(std::str::from_utf8(&content), Ok(expected));
411+
Response::builder().status(200).body(Full::new(response.into()))
414412
})
415413
}
416414

417-
fn return_error_response(_req: Request<Body>) -> HyperResponse {
415+
fn return_error_response(_req: Request<Incoming>) -> HyperResponse {
418416
let response = r#"{
419417
"jsonrpc":"2.0",
420418
"error":{
@@ -423,12 +421,16 @@ mod tests {
423421
},
424422
"id":null
425423
}"#;
426-
Box::pin(async move { Ok(Response::new(response.into())) })
424+
let response = Response::builder()
425+
.status(200)
426+
.body(Full::new(response.into()))
427+
.unwrap();
428+
Box::pin(async move { Ok(response) })
427429
}
428430

429431
fn return_sequence(handlers: Vec<HyperHandler>) -> HyperHandler {
430432
let handlers = Arc::new(Mutex::new(handlers));
431-
Box::new(move |_req: Request<Body>| -> HyperResponse {
433+
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
432434
let handlers = handlers.clone();
433435
Box::pin(async move {
434436
let mut handlers = handlers.lock().await;
@@ -441,8 +443,8 @@ mod tests {
441443
#[tokio::test]
442444
async fn should_make_a_request() {
443445
// given
444-
let port = get_available_port().unwrap();
445-
let _ = create_server(port, Box::new(check_and_return_mock_response));
446+
let port = get_available_port().await.unwrap();
447+
let _ = create_server(port, Box::new(check_and_return_mock_response)).await;
446448
let client = create_client(port, Retries::default());
447449

448450
// when
@@ -457,8 +459,8 @@ mod tests {
457459
#[tokio::test]
458460
async fn catch_generic_json_error_for_batched_request() {
459461
// given
460-
let port = get_available_port().unwrap();
461-
let _ = create_server(port, Box::new(return_error_response));
462+
let port = get_available_port().await.unwrap();
463+
let _ = create_server(port, Box::new(return_error_response)).await;
462464
let client = create_client(port, Retries::default());
463465

464466
// when
@@ -505,14 +507,15 @@ mod tests {
505507
#[tokio::test]
506508
async fn status_code_429_with_retry_after_as_seconds() {
507509
// given
508-
let port = get_available_port().unwrap();
510+
let port = get_available_port().await.unwrap();
509511
let _ = create_server(
510512
port,
511513
return_sequence(vec![
512514
return_429(Some("3".into())),
513515
Box::new(check_and_return_mock_response),
514516
]),
515-
);
517+
)
518+
.await;
516519
let client = create_client(
517520
port,
518521
Retries {
@@ -537,7 +540,7 @@ mod tests {
537540
#[tokio::test]
538541
async fn status_code_429_with_retry_after_as_date() {
539542
// given
540-
let port = get_available_port().unwrap();
543+
let port = get_available_port().await.unwrap();
541544
let started = Instant::now();
542545
let retry_after_value: DateTime<Utc> = DateTime::from(Utc::now() + Duration::from_secs(3));
543546
let _ = create_server(
@@ -546,7 +549,8 @@ mod tests {
546549
return_429(Some(retry_after_value.to_rfc2822())),
547550
Box::new(check_and_return_mock_response),
548551
]),
549-
);
552+
)
553+
.await;
550554
let client = create_client(
551555
port,
552556
Retries {
@@ -570,11 +574,12 @@ mod tests {
570574
#[tokio::test]
571575
async fn status_code_429_with_invalid_retry_after() {
572576
// given
573-
let port = get_available_port().unwrap();
577+
let port = get_available_port().await.unwrap();
574578
let _ = create_server(
575579
port,
576580
return_sequence(vec![return_429(Some("retry some time later, idc".into()))]),
577-
);
581+
)
582+
.await;
578583
let client = create_client(
579584
port,
580585
Retries {
@@ -596,8 +601,8 @@ mod tests {
596601
#[tokio::test]
597602
async fn status_code_429_without_retry_after() {
598603
// given
599-
let port = get_available_port().unwrap();
600-
let _ = create_server(port, return_sequence(vec![return_429(None)]));
604+
let port = get_available_port().await.unwrap();
605+
let _ = create_server(port, return_sequence(vec![return_429(None)])).await;
601606
let client = create_client(
602607
port,
603608
Retries {
@@ -619,8 +624,8 @@ mod tests {
619624
#[tokio::test]
620625
async fn status_code_429_retry_after_disabled() {
621626
// given
622-
let port = get_available_port().unwrap();
623-
let _ = create_server(port, return_sequence(vec![return_429(Some("3".into()))]));
627+
let port = get_available_port().await.unwrap();
628+
let _ = create_server(port, return_sequence(vec![return_429(Some("3".into()))])).await;
624629
let client = create_client(
625630
port,
626631
Retries {
@@ -642,15 +647,16 @@ mod tests {
642647
#[tokio::test]
643648
async fn status_code_429_with_retries() {
644649
// given
645-
let port = get_available_port().unwrap();
650+
let port = get_available_port().await.unwrap();
646651
let _ = create_server(
647652
port,
648653
return_sequence(vec![
649654
return_429(Some("3".into())), // sleep for 1 second as configured below
650655
return_429(Some("3".into())), // sleep for 2 seconds (2x 1sec)
651656
Box::new(check_and_return_mock_response),
652657
]),
653-
);
658+
)
659+
.await;
654660
let client = create_client(
655661
port,
656662
Retries {
@@ -675,15 +681,16 @@ mod tests {
675681
#[tokio::test]
676682
async fn status_code_5xx_with_retries() {
677683
// given
678-
let port = get_available_port().unwrap();
684+
let port = get_available_port().await.unwrap();
679685
let _ = create_server(
680686
port,
681687
return_sequence(vec![
682688
return_5xx(500), // sleep for 1 second as configured below
683689
return_5xx(502), // sleep for 2 seconds (2x 1sec)
684690
Box::new(check_and_return_mock_response),
685691
]),
686-
);
692+
)
693+
.await;
687694
let client = create_client(
688695
port,
689696
Retries {
@@ -708,7 +715,7 @@ mod tests {
708715
#[tokio::test]
709716
async fn status_code_5xx_retries_exhausted() {
710717
// given
711-
let port = get_available_port().unwrap();
718+
let port = get_available_port().await.unwrap();
712719
let _ = create_server(
713720
port,
714721
return_sequence(vec![
@@ -717,7 +724,8 @@ mod tests {
717724
return_5xx(503),
718725
Box::new(check_and_return_mock_response),
719726
]),
720-
);
727+
)
728+
.await;
721729
let client = create_client(
722730
port,
723731
Retries {
@@ -739,8 +747,8 @@ mod tests {
739747
#[tokio::test]
740748
async fn status_code_5xx_without_retries() {
741749
// given
742-
let port = get_available_port().unwrap();
743-
let _ = create_server(port, return_sequence(vec![return_5xx(500)]));
750+
let port = get_available_port().await.unwrap();
751+
let _ = create_server(port, return_sequence(vec![return_5xx(500)])).await;
744752
let client = create_client(
745753
port,
746754
Retries {

0 commit comments

Comments
 (0)