Skip to content

Commit 1b95ac1

Browse files
committed
test lambda function end-to-end
1 parent 77c0b95 commit 1b95ac1

File tree

3 files changed

+68
-18
lines changed

3 files changed

+68
-18
lines changed

lambda/src/client.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use http::{
1010
HeaderValue, Method, Request, Response, StatusCode, Uri,
1111
};
1212
use hyper::Body;
13+
use serde_json::json;
1314
use std::{
1415
convert::{TryFrom, TryInto},
1516
future::Future,
@@ -70,9 +71,9 @@ where
7071
}
7172
}
7273

73-
pub struct NextEventSvc;
74+
pub struct EndpointSvc;
7475

75-
impl Service<Request<Body>> for NextEventSvc {
76+
impl Service<Request<Body>> for EndpointSvc {
7677
type Response = Response<Body>;
7778
type Error = crate::Err;
7879
type Future = Fut<'static, Result<Self::Response, Self::Error>>;
@@ -101,13 +102,14 @@ async fn next_event(req: Request<Body>) -> Result<Response<Body>, Err> {
101102
let path = "/2018-06-01/runtime/invocation/next";
102103
assert_eq!(req.method(), Method::GET);
103104
assert_eq!(req.uri().path_and_query().unwrap(), &PathAndQuery::from_static(path));
105+
let body = json!({"message": "hello"});
104106

105107
let rsp = NextEventResponse {
106108
request_id: "8476a536-e9f4-11e8-9739-2dfe598c3fcd",
107109
deadline: 1542409706888,
108110
arn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime",
109111
trace_id: "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419",
110-
body: vec![],
112+
body: serde_json::to_vec(&body)?,
111113
};
112114
rsp.into_rsp()
113115
}
@@ -144,7 +146,7 @@ async fn event_err(req: Request<Body>) -> Result<Response<Body>, Err> {
144146
pub struct MakeSvc;
145147

146148
impl<T> Service<T> for MakeSvc {
147-
type Response = NextEventSvc;
149+
type Response = EndpointSvc;
148150
type Error = std::io::Error;
149151
type Future = future::Ready<Result<Self::Response, Self::Error>>;
150152

@@ -153,15 +155,17 @@ impl<T> Service<T> for MakeSvc {
153155
}
154156

155157
fn call(&mut self, _: T) -> Self::Future {
156-
future::ok(NextEventSvc)
158+
future::ok(EndpointSvc)
157159
}
158160
}
159161

160162
#[cfg(test)]
161163
mod endpoint_tests {
162164
use super::{Client, MakeSvc};
163165
use crate::{
166+
context, handler_fn,
164167
requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest},
168+
run_simulated,
165169
types::Diagnostic,
166170
Err,
167171
};
@@ -254,4 +258,30 @@ mod endpoint_tests {
254258
future::try_select(server, client).await.unwrap();
255259
Ok(())
256260
}
261+
262+
#[tokio::test]
263+
async fn run_end_to_end() -> Result<(), Err> {
264+
use serde_json::Value;
265+
let (listener, addr) = setup()?;
266+
let url = format!("http://{}/", addr);
267+
268+
let server = tokio::spawn(async move {
269+
let svc = hyper::Server::from_tcp(listener)?.serve(MakeSvc);
270+
svc.await
271+
});
272+
273+
async fn handler(s: Value) -> Result<Value, Err> {
274+
let ctx = context();
275+
assert!(ctx.xray_trace_id.is_some());
276+
Ok(s)
277+
}
278+
let handler = handler_fn(handler);
279+
280+
let handler = tokio::spawn(async move {
281+
run_simulated(handler, &url).await?;
282+
Ok::<(), Err>(())
283+
});
284+
future::try_select(server, handler).await.unwrap();
285+
Ok(())
286+
}
257287
}

lambda/src/lib.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
//! }
3434
//! ```
3535
pub use crate::types::LambdaCtx;
36-
use bytes::buf::BufExt;
3736
use client::Client;
3837
use http::{Request, Response};
3938
use hyper::Body;
@@ -129,10 +128,6 @@ pub trait Handler<A, B> {
129128
/// The future response value of this handler.
130129
type Fut: Future<Output = Result<B, Self::Err>>;
131130
/// Process the incoming event and return the response asynchronously.
132-
///
133-
/// # Arguments
134-
/// * `event` - The data received in the invocation request
135-
/// * `ctx` - The context for the current invocation
136131
fn call(&mut self, event: A) -> Self::Fut;
137132
}
138133

@@ -193,7 +188,23 @@ where
193188
let config = Config::from_env()?;
194189
let client = Client::with(&config.endpoint, hyper::Client::new())?;
195190
let mut exec = Executor { client };
196-
exec.run(&mut handler).await?;
191+
exec.run(&mut handler, false).await?;
192+
193+
Ok(())
194+
}
195+
196+
/// Runs the lambda function almost entirely in-memory. This is mean for easier testing.
197+
pub async fn run_simulated<A, B, F>(handler: F, url: &str) -> Result<(), Err>
198+
where
199+
F: Handler<A, B>,
200+
<F as Handler<A, B>>::Err: fmt::Debug,
201+
A: for<'de> Deserialize<'de>,
202+
B: Serialize,
203+
{
204+
let mut handler = handler;
205+
let client = Client::with(url, hyper::Client::new())?;
206+
let mut exec = Executor { client };
207+
exec.run(&mut handler, true).await?;
197208

198209
Ok(())
199210
}
@@ -207,24 +218,24 @@ where
207218
S: Service<Request<Body>, Response = Response<Body>>,
208219
<S as Service<Request<Body>>>::Error: Into<Err> + Send + Sync + 'static + std::error::Error,
209220
{
210-
async fn run<A, B, F>(&mut self, handler: &mut F) -> Result<(), Err>
221+
async fn run<A, B, F>(&mut self, handler: &mut F, once: bool) -> Result<(), Err>
211222
where
212223
F: Handler<A, B>,
213224
<F as Handler<A, B>>::Err: fmt::Debug,
214225
A: for<'de> Deserialize<'de>,
215226
B: Serialize,
216227
{
217228
let client = &mut self.client;
229+
// todo: refactor this into a stream so that the `once` boolean can be replaced
230+
// a `.take(n).await` combinator if we want to run this once, if `n` is `1`.
218231
loop {
219232
let req = NextEventRequest.into_req()?;
220233
let event = client.call(req).await?;
221234
let (parts, body) = event.into_parts();
222235

223-
let mut ctx = LambdaCtx::try_from(parts.headers)?;
224-
ctx.env_config = Config::from_env()?;
225-
226-
let body = hyper::body::aggregate(body).await?;
227-
let body = serde_json::from_reader(body.reader())?;
236+
let ctx = LambdaCtx::try_from(parts.headers)?;
237+
let body = hyper::body::to_bytes(body).await?;
238+
let body = serde_json::from_slice(&body)?;
228239

229240
let request_id = &ctx.request_id.clone();
230241
let f = WithTaskLocal {
@@ -244,6 +255,9 @@ where
244255
.into_req()?,
245256
};
246257
client.call(req).await?;
258+
if once {
259+
break Ok(());
260+
}
247261
}
248262
}
249263
}

lambda/src/types.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,14 @@ impl TryFrom<HeaderMap> for LambdaCtx {
130130
.expect("Missing deadline"),
131131
invoked_function_arn: headers["lambda-runtime-invoked-function-arn"]
132132
.to_str()
133-
.expect("Missing arn")
133+
.expect("Missing arn; this is a bug")
134134
.to_owned(),
135+
xray_trace_id: headers.get("lambda-runtime-trace-id").map(|header| {
136+
header
137+
.to_str()
138+
.expect("Invalid XRayTraceID sent by Lambda; this is a bug")
139+
.to_owned()
140+
}),
135141
..Default::default()
136142
};
137143
Ok(ctx)

0 commit comments

Comments
 (0)