Skip to content

Commit e08f69c

Browse files
committed
fix(transports): fix rebase errors
1 parent f8d1ac9 commit e08f69c

File tree

1 file changed

+76
-63
lines changed

1 file changed

+76
-63
lines changed

src/transports/http.rs

Lines changed: 76 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -305,45 +305,49 @@ fn id_of_output(output: &Output) -> Result<RequestId> {
305305

306306
#[cfg(test)]
307307
mod tests {
308+
use std::{future::Future, pin::Pin};
309+
308310
use super::*;
309311
use crate::Error::Rpc;
310-
use core::pin::Pin;
311-
use futures::{lock::Mutex, Future};
312+
use futures::lock::Mutex;
313+
use http_body_util::{BodyExt, Full};
312314
use hyper::{
313-
body::HttpBody,
314-
service::{make_service_fn, service_fn},
315-
Body, Error, Method, Request, Response, Server,
315+
body::{Bytes, Incoming},
316+
server::conn::http1,
317+
service::service_fn,
318+
Method, Request, Response,
316319
};
320+
use hyper_util::rt::TokioIo;
317321
use jsonrpc_core::ErrorCode;
318-
use std::net::TcpListener;
319-
use tokio::{task::JoinHandle, time::Instant};
322+
use tokio::{net::TcpListener, task::JoinHandle, time::Instant};
320323

321-
type HyperResponse = Pin<Box<dyn Future<Output = hyper::Result<Response<Body>>> + Send>>;
324+
type HyperResponse =
325+
Pin<Box<dyn Future<Output = std::result::Result<Response<Full<Bytes>>, hyper::http::Error>> + Send + Sync>>;
322326

323-
type HyperHandler = Box<dyn Fn(Request<Body>) -> HyperResponse + Send + Sync>;
327+
type HyperHandler = Box<dyn Fn(Request<Incoming>) -> HyperResponse + Send + Sync>;
324328

325-
fn get_available_port() -> Option<u16> {
326-
Some(TcpListener::bind(("127.0.0.1", 0)).ok()?.local_addr().ok()?.port())
329+
async fn get_available_port() -> Option<u16> {
330+
Some(
331+
TcpListener::bind(("127.0.0.1", 0))
332+
.await
333+
.ok()?
334+
.local_addr()
335+
.ok()?
336+
.port(),
337+
)
327338
}
328339

329-
fn create_server(port: u16, handler: HyperHandler) -> JoinHandle<()> {
340+
async fn create_server(port: u16, handler: HyperHandler) -> JoinHandle<()> {
330341
let addr = format!("127.0.0.1:{}", port);
342+
let listener = TcpListener::bind(addr).await.unwrap();
331343
let handler = Arc::new(handler);
332-
let service = make_service_fn(move |_| {
333-
let handler = handler.clone();
334-
async move {
335-
let handler = handler.clone();
336-
Ok::<_, Error>(service_fn(move |req| {
337-
let handler = handler.clone();
338-
async move { handler(req).await }
339-
}))
344+
tokio::task::spawn(async move {
345+
loop {
346+
let (stream, _) = listener.accept().await.unwrap();
347+
let service = service_fn(handler.as_ref());
348+
let io = TokioIo::new(stream);
349+
http1::Builder::new().serve_connection(io, service).await.unwrap();
340350
}
341-
});
342-
343-
let server = Server::bind(&addr.parse().unwrap()).serve(service);
344-
tokio::spawn(async move {
345-
println!("Listening on http://{}", addr);
346-
server.await.unwrap();
347351
})
348352
}
349353

@@ -356,9 +360,9 @@ mod tests {
356360
}
357361

358362
fn return_429(retry_after_value: Option<String>) -> HyperHandler {
359-
Box::new(move |_req: Request<Body>| -> HyperResponse {
363+
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
360364
let retry_after_value = retry_after_value.clone();
361-
let response_body = Body::from(
365+
let response_body = Bytes::from(
362366
r#"{
363367
"jsonrpc": "2.0",
364368
"error": {
@@ -373,14 +377,14 @@ mod tests {
373377
response = response.header("Retry-After", value)
374378
}
375379

376-
let response = response.body(response_body).unwrap();
380+
let response = response.body(Full::new(response_body)).unwrap();
377381
Box::pin(async move { Ok(response) })
378382
})
379383
}
380384

381385
fn return_5xx(code: u16) -> HyperHandler {
382-
Box::new(move |_req: Request<Body>| -> HyperResponse {
383-
let response_body = Body::from(
386+
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
387+
let response_body = Bytes::from(
384388
r#"{
385389
"jsonrpc": "2.0",
386390
"error": {
@@ -390,12 +394,12 @@ mod tests {
390394
}"#,
391395
);
392396

393-
let response = Response::builder().status(code).body(response_body).unwrap();
397+
let response = Response::builder().status(code).body(Full::new(response_body)).unwrap();
394398
Box::pin(async move { Ok(response) })
395399
})
396400
}
397401

398-
fn check_and_return_mock_response(req: Request<Body>) -> HyperResponse {
402+
fn check_and_return_mock_response(req: Request<Incoming>) -> HyperResponse {
399403
let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":0}"#;
400404
let response = r#"{"jsonrpc":"2.0","id":0,"result":"x"}"#;
401405

@@ -405,16 +409,15 @@ mod tests {
405409
let mut body = req.into_body();
406410

407411
Box::pin(async move {
408-
while let Some(Ok(chunk)) = body.data().await {
409-
content.extend(&*chunk);
412+
while let Some(Ok(chunk)) = body.frame().await {
413+
content.extend(chunk.into_data().unwrap());
410414
}
411-
assert_eq!(std::str::from_utf8(&*content), Ok(expected));
412-
413-
Ok(Response::new(response.into()))
415+
assert_eq!(std::str::from_utf8(&content), Ok(expected));
416+
Response::builder().status(200).body(Full::new(response.into()))
414417
})
415418
}
416419

417-
fn return_error_response(_req: Request<Body>) -> HyperResponse {
420+
fn return_error_response(_req: Request<Incoming>) -> HyperResponse {
418421
let response = r#"{
419422
"jsonrpc":"2.0",
420423
"error":{
@@ -423,12 +426,16 @@ mod tests {
423426
},
424427
"id":null
425428
}"#;
426-
Box::pin(async move { Ok(Response::new(response.into())) })
429+
let response = Response::builder()
430+
.status(200)
431+
.body(Full::new(response.into()))
432+
.unwrap();
433+
Box::pin(async move { Ok(response) })
427434
}
428435

429436
fn return_sequence(handlers: Vec<HyperHandler>) -> HyperHandler {
430437
let handlers = Arc::new(Mutex::new(handlers));
431-
Box::new(move |_req: Request<Body>| -> HyperResponse {
438+
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
432439
let handlers = handlers.clone();
433440
Box::pin(async move {
434441
let mut handlers = handlers.lock().await;
@@ -441,8 +448,8 @@ mod tests {
441448
#[tokio::test]
442449
async fn should_make_a_request() {
443450
// given
444-
let port = get_available_port().unwrap();
445-
let _ = create_server(port, Box::new(check_and_return_mock_response));
451+
let port = get_available_port().await.unwrap();
452+
let _ = create_server(port, Box::new(check_and_return_mock_response)).await;
446453
let client = create_client(port, Retries::default());
447454

448455
// when
@@ -457,8 +464,8 @@ mod tests {
457464
#[tokio::test]
458465
async fn catch_generic_json_error_for_batched_request() {
459466
// given
460-
let port = get_available_port().unwrap();
461-
let _ = create_server(port, Box::new(return_error_response));
467+
let port = get_available_port().await.unwrap();
468+
let _ = create_server(port, Box::new(return_error_response)).await;
462469
let client = create_client(port, Retries::default());
463470

464471
// when
@@ -505,14 +512,15 @@ mod tests {
505512
#[tokio::test]
506513
async fn status_code_429_with_retry_after_as_seconds() {
507514
// given
508-
let port = get_available_port().unwrap();
515+
let port = get_available_port().await.unwrap();
509516
let _ = create_server(
510517
port,
511518
return_sequence(vec![
512519
return_429(Some("3".into())),
513520
Box::new(check_and_return_mock_response),
514521
]),
515-
);
522+
)
523+
.await;
516524
let client = create_client(
517525
port,
518526
Retries {
@@ -537,7 +545,7 @@ mod tests {
537545
#[tokio::test]
538546
async fn status_code_429_with_retry_after_as_date() {
539547
// given
540-
let port = get_available_port().unwrap();
548+
let port = get_available_port().await.unwrap();
541549
let started = Instant::now();
542550
let retry_after_value: DateTime<Utc> = DateTime::from(Utc::now() + Duration::from_secs(3));
543551
let _ = create_server(
@@ -546,7 +554,8 @@ mod tests {
546554
return_429(Some(retry_after_value.to_rfc2822())),
547555
Box::new(check_and_return_mock_response),
548556
]),
549-
);
557+
)
558+
.await;
550559
let client = create_client(
551560
port,
552561
Retries {
@@ -570,11 +579,12 @@ mod tests {
570579
#[tokio::test]
571580
async fn status_code_429_with_invalid_retry_after() {
572581
// given
573-
let port = get_available_port().unwrap();
582+
let port = get_available_port().await.unwrap();
574583
let _ = create_server(
575584
port,
576585
return_sequence(vec![return_429(Some("retry some time later, idc".into()))]),
577-
);
586+
)
587+
.await;
578588
let client = create_client(
579589
port,
580590
Retries {
@@ -596,8 +606,8 @@ mod tests {
596606
#[tokio::test]
597607
async fn status_code_429_without_retry_after() {
598608
// given
599-
let port = get_available_port().unwrap();
600-
let _ = create_server(port, return_sequence(vec![return_429(None)]));
609+
let port = get_available_port().await.unwrap();
610+
let _ = create_server(port, return_sequence(vec![return_429(None)])).await;
601611
let client = create_client(
602612
port,
603613
Retries {
@@ -619,8 +629,8 @@ mod tests {
619629
#[tokio::test]
620630
async fn status_code_429_retry_after_disabled() {
621631
// given
622-
let port = get_available_port().unwrap();
623-
let _ = create_server(port, return_sequence(vec![return_429(Some("3".into()))]));
632+
let port = get_available_port().await.unwrap();
633+
let _ = create_server(port, return_sequence(vec![return_429(Some("3".into()))])).await;
624634
let client = create_client(
625635
port,
626636
Retries {
@@ -642,15 +652,16 @@ mod tests {
642652
#[tokio::test]
643653
async fn status_code_429_with_retries() {
644654
// given
645-
let port = get_available_port().unwrap();
655+
let port = get_available_port().await.unwrap();
646656
let _ = create_server(
647657
port,
648658
return_sequence(vec![
649659
return_429(Some("3".into())), // sleep for 1 second as configured below
650660
return_429(Some("3".into())), // sleep for 2 seconds (2x 1sec)
651661
Box::new(check_and_return_mock_response),
652662
]),
653-
);
663+
)
664+
.await;
654665
let client = create_client(
655666
port,
656667
Retries {
@@ -675,15 +686,16 @@ mod tests {
675686
#[tokio::test]
676687
async fn status_code_5xx_with_retries() {
677688
// given
678-
let port = get_available_port().unwrap();
689+
let port = get_available_port().await.unwrap();
679690
let _ = create_server(
680691
port,
681692
return_sequence(vec![
682693
return_5xx(500), // sleep for 1 second as configured below
683694
return_5xx(502), // sleep for 2 seconds (2x 1sec)
684695
Box::new(check_and_return_mock_response),
685696
]),
686-
);
697+
)
698+
.await;
687699
let client = create_client(
688700
port,
689701
Retries {
@@ -708,7 +720,7 @@ mod tests {
708720
#[tokio::test]
709721
async fn status_code_5xx_retries_exhausted() {
710722
// given
711-
let port = get_available_port().unwrap();
723+
let port = get_available_port().await.unwrap();
712724
let _ = create_server(
713725
port,
714726
return_sequence(vec![
@@ -717,7 +729,8 @@ mod tests {
717729
return_5xx(503),
718730
Box::new(check_and_return_mock_response),
719731
]),
720-
);
732+
)
733+
.await;
721734
let client = create_client(
722735
port,
723736
Retries {
@@ -739,8 +752,8 @@ mod tests {
739752
#[tokio::test]
740753
async fn status_code_5xx_without_retries() {
741754
// given
742-
let port = get_available_port().unwrap();
743-
let _ = create_server(port, return_sequence(vec![return_5xx(500)]));
755+
let port = get_available_port().await.unwrap();
756+
let _ = create_server(port, return_sequence(vec![return_5xx(500)])).await;
744757
let client = create_client(
745758
port,
746759
Retries {

0 commit comments

Comments
 (0)