Skip to content

Commit 0072f42

Browse files
authored
fix(eventstream-handler-node): start streaming without waiting for response (#6311)
* fix(eventstream-handler-node): start streaming without waiting for response * test(middleware-eventstream): use dummy asynciterator in integ tests
1 parent ff30a3e commit 0072f42

File tree

4 files changed

+62
-54
lines changed

4 files changed

+62
-54
lines changed

packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ jest.mock("./EventSigningStream");
99
jest.mock("@smithy/eventstream-codec");
1010

1111
describe(EventStreamPayloadHandler.name, () => {
12+
const collectData = (stream: Readable) => {
13+
const chunks: any = [];
14+
return new Promise((resolve, reject) => {
15+
stream.on("data", (chunk) => chunks.push(chunk));
16+
stream.on("error", reject);
17+
stream.on("end", () => resolve(Buffer.concat(chunks).toString("utf8")));
18+
});
19+
};
20+
1221
const mockMessageSigner: MessageSigner = {
1322
sign: jest.fn(),
1423
signMessage: jest.fn(),
@@ -49,7 +58,7 @@ describe(EventStreamPayloadHandler.name, () => {
4958
utf8Decoder: mockUtf8Decoder,
5059
utf8Encoder: mockUtf8encoder,
5160
});
52-
const mockRequest = { body: new Readable() } as HttpRequest;
61+
const mockRequest = { body: new PassThrough() } as HttpRequest;
5362

5463
try {
5564
await handler.handle(mockNextHandler, {
@@ -126,6 +135,42 @@ describe(EventStreamPayloadHandler.name, () => {
126135
});
127136
});
128137

138+
it("should start piping regardless of whether the downstream resolves", async () => {
139+
const authorization =
140+
"AWS4-HMAC-SHA256 Credential=AKID/20200510/us-west-2/foo/aws4_request, SignedHeaders=host, Signature=1234567890";
141+
const originalPayload = new PassThrough();
142+
const mockRequest = {
143+
body: originalPayload,
144+
headers: { authorization },
145+
} as any;
146+
const handler = new EventStreamPayloadHandler({
147+
messageSigner: () => Promise.resolve(mockMessageSigner),
148+
utf8Decoder: mockUtf8Decoder,
149+
utf8Encoder: mockUtf8encoder,
150+
});
151+
152+
(mockNextHandler as any).mockImplementationOnce(async (args: FinalizeHandlerArguments<any>) => {
153+
const handledRequest = args.request as HttpRequest;
154+
155+
originalPayload.end("Some Data");
156+
const collected = await collectData(handledRequest.body);
157+
158+
// this means the stream is flowing without this downstream middleware
159+
// having resolved yet.
160+
expect(collected).toEqual("Some Data");
161+
162+
return Promise.resolve({ output: { handledRequest } });
163+
});
164+
165+
const {
166+
output: { handledRequest },
167+
} = await handler.handle(mockNextHandler, {
168+
request: mockRequest,
169+
input: {},
170+
});
171+
expect(handledRequest.body).not.toBe(originalPayload);
172+
});
173+
129174
it("should start piping to request payload through event signer if downstream middleware returns", async () => {
130175
const authorization =
131176
"AWS4-HMAC-SHA256 Credential=AKID/20200510/us-west-2/foo/aws4_request, SignedHeaders=host, Signature=1234567890";
@@ -155,14 +200,6 @@ describe(EventStreamPayloadHandler.name, () => {
155200
expect(handledRequest.body).not.toBe(originalPayload);
156201
// Expect the data from the output payload from eventstream payload handler the same as from the
157202
// stream supplied to the handler.
158-
const collectData = (stream: Readable) => {
159-
const chunks: any = [];
160-
return new Promise((resolve, reject) => {
161-
stream.on("data", (chunk) => chunks.push(chunk));
162-
stream.on("error", reject);
163-
stream.on("end", () => resolve(Buffer.concat(chunks).toString("utf8")));
164-
});
165-
};
166203
originalPayload.end("Some Data");
167204
const collected = await collectData(handledRequest.body);
168205
expect(collected).toEqual("Some Data");

packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,9 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
6464
objectMode: true,
6565
});
6666

67-
let result: FinalizeHandlerOutput<any>;
68-
try {
69-
result = await next(args);
70-
} catch (e) {
71-
// Close the payload stream otherwise the retry would hang
72-
// because of the previous connection.
73-
request.body.end();
74-
throw e;
75-
}
76-
77-
// If response is successful, start piping the payload stream
78-
const match = (request.headers["authorization"] || "").match(/Signature=([\w]+)$/);
67+
const match = request.headers?.authorization?.match(/Signature=([\w]+)$/);
7968
// Sign the eventstream based on the signature from initial request.
80-
const priorSignature = (match || [])[1] || (query && (query["X-Amz-Signature"] as string)) || "";
69+
const priorSignature = match?.[1] ?? (query?.["X-Amz-Signature"] as string) ?? "";
8170
const signingStream = new EventSigningStream({
8271
priorSignature,
8372
eventStreamCodec: this.eventStreamCodec,
@@ -91,6 +80,16 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
9180
}
9281
});
9382

83+
let result: FinalizeHandlerOutput<any>;
84+
try {
85+
result = await next(args);
86+
} catch (e) {
87+
// Close the payload stream otherwise the retry would hang
88+
// because of the previous connection.
89+
request.body.end();
90+
throw e;
91+
}
92+
9493
return result;
9594
}
9695
}

packages/middleware-eventstream/src/middleware-eventstream.integ.spec.ts

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,7 @@ describe("middleware-eventstream", () => {
3030
botAliasId: "undefined",
3131
localeId: "undefined",
3232
sessionId: "undefined",
33-
requestEventStream: {
34-
[Symbol.asyncIterator]() {
35-
return {
36-
next() {
37-
return this as any;
38-
},
39-
};
40-
},
41-
},
33+
requestEventStream: (async function* () {})(),
4234
});
4335

4436
expect.assertions(2);
@@ -61,15 +53,7 @@ describe("middleware-eventstream", () => {
6153
VideoWidth: "undefined",
6254
VideoHeight: "undefined",
6355
ChallengeVersions: "undefined",
64-
LivenessRequestStream: {
65-
[Symbol.asyncIterator]() {
66-
return {
67-
next() {
68-
return this as any;
69-
},
70-
};
71-
},
72-
},
56+
LivenessRequestStream: (async function* () {})(),
7357
});
7458

7559
expect.assertions(2);
@@ -91,15 +75,7 @@ describe("middleware-eventstream", () => {
9175
await client.startStreamTranscription({
9276
MediaSampleRateHertz: 144,
9377
MediaEncoding: "ogg-opus",
94-
AudioStream: {
95-
[Symbol.asyncIterator]() {
96-
return {
97-
next() {
98-
return this as any;
99-
},
100-
};
101-
},
102-
},
78+
AudioStream: (async function* () {})(),
10379
});
10480

10581
expect.assertions(2);

packages/middleware-websocket/src/middleware-websocket.integ.spec.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,7 @@ describe("middleware-websocket", () => {
4747
VideoWidth: "1024",
4848
VideoHeight: "1024",
4949
ChallengeVersions: "a,b,c",
50-
LivenessRequestStream: {
51-
[Symbol.asyncIterator]() {
52-
return this as any;
53-
},
54-
},
50+
LivenessRequestStream: (async function* () {})(),
5551
});
5652
});
5753
});

0 commit comments

Comments
 (0)