Skip to content

Commit 545ddb4

Browse files
committed
feat: handle unmodeled exceptions in eventstream serde
1 parent 6c574a2 commit 545ddb4

File tree

8 files changed

+95
-64
lines changed

8 files changed

+95
-64
lines changed

packages/eventstream-serde-browser/src/EventStreamMarshaller.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ export class EventStreamMarshaller {
2424
}
2525

2626
deserialize<T>(
27-
body: ReadableStream,
28-
deserializer: (input: { [event: string]: Message }) => T
27+
body: ReadableStream<Uint8Array>,
28+
deserializer: (input: { [event: string]: Message }) => Promise<T>
2929
): AsyncIterable<T> {
3030
const chunkedStream = getChunkedStream(body);
3131
const messageStream = getEventMessageStream(

packages/eventstream-serde-node/src/EventStreamMarshaller.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export class EventStreamMarshaller {
2929

3030
deserialize<T>(
3131
body: Readable,
32-
deserializer: (input: { [event: string]: Message }) => T
32+
deserializer: (input: { [event: string]: Message }) => Promise<T>
3333
): AsyncIterable<T> {
3434
//should use stream[Symbol.asyncIterable] when the api is stable
3535
//reference: https://nodejs.org/docs/latest-v11.x/api/stream.html#stream_readable_symbol_asynciterator

packages/eventstream-serde-universal/src/EventStreamMarshaller.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import {
77
} from "@aws-sdk/types";
88
import { getChunkedStream } from "./getChunkedStream";
99
import { getUnmarshalledStream } from "./getUnmarshalledStream";
10-
import { getDeserializedStream } from "./getDeserializedStream";
1110

1211
export interface EventStreamMarshaller extends IEventStreamMarshaller {}
1312

@@ -18,22 +17,23 @@ export interface EventStreamMarshallerOptions {
1817

1918
export class EventStreamMarshaller {
2019
private readonly eventMarshaller: EventMarshaller;
20+
private readonly utfEncoder: Encoder;
2121
constructor({ utf8Encoder, utf8Decoder }: EventStreamMarshallerOptions) {
2222
this.eventMarshaller = new EventMarshaller(utf8Encoder, utf8Decoder);
23+
this.utfEncoder = utf8Encoder;
2324
}
2425

2526
deserialize<T>(
2627
body: AsyncIterable<Uint8Array>,
27-
deserializer: (input: { [event: string]: Message }) => T
28+
deserializer: (input: { [event: string]: Message }) => Promise<T>
2829
): AsyncIterable<T> {
2930
const chunkedStream = getChunkedStream(body);
3031
const unmarshalledStream = getUnmarshalledStream(chunkedStream, {
31-
eventMarshaller: this.eventMarshaller
32+
eventMarshaller: this.eventMarshaller,
33+
deserializer,
34+
toUtf8: this.utfEncoder
3235
});
33-
const deserializedStream = getDeserializedStream(unmarshalledStream, {
34-
deserializer
35-
});
36-
return deserializedStream;
36+
return unmarshalledStream;
3737
}
3838

3939
serialize<T>(

packages/eventstream-serde-universal/src/fixtures/event.fixture.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,8 @@ export const endEventMessage = Buffer.from(
1212
"AAAAOAAAACjBxoTUDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcAA0VuZM+X05I=",
1313
"base64"
1414
);
15+
16+
export const exception = Buffer.from(
17+
"AAAAtgAAAF8BcW64DTpjb250ZW50LXR5cGUHABhhcHBsaWNhdGlvbi9vY3RldC1zdHJlYW0NOm1lc3NhZ2UtdHlwZQcACWV4Y2VwdGlvbg86ZXhjZXB0aW9uLXR5cGUHAAlFeGNlcHRpb25UaGlzIGlzIGEgbW9kZWxlZCBleGNlcHRpb24gZXZlbnQgdGhhdCB3b3VsZCBiZSB0aHJvd24gaW4gZGVzZXJpYWxpemVyLj6Gc60=",
18+
"base64"
19+
);

packages/eventstream-serde-universal/src/getDeserializedStream.spec.ts

Lines changed: 0 additions & 26 deletions
This file was deleted.

packages/eventstream-serde-universal/src/getDeserializedStream.ts

Lines changed: 0 additions & 17 deletions
This file was deleted.

packages/eventstream-serde-universal/src/getUnmarshalledStream.spec.ts

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { getUnmarshalledStream } from "./getUnmarshalledStream";
44
import {
55
recordEventMessage,
66
statsEventMessage,
7-
endEventMessage
7+
endEventMessage,
8+
exception
89
} from "./fixtures/event.fixture";
910
import { Message } from "@aws-sdk/types";
1011

@@ -56,7 +57,9 @@ describe("getUnmarshalledStream", () => {
5657
yield endEventMessage;
5758
};
5859
const unmarshallerStream = getUnmarshalledStream(source(), {
59-
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8)
60+
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8),
61+
deserializer: message => Promise.resolve(message),
62+
toUtf8
6063
});
6164
const messages: Array<Message> = [];
6265
for await (const message of unmarshallerStream) {
@@ -66,4 +69,59 @@ describe("getUnmarshalledStream", () => {
6669
expect(messages[i]).toEqual(expectedMessages[i]);
6770
}
6871
});
72+
73+
it("throws when deserializer throws an error", async () => {
74+
const source = {
75+
[Symbol.asyncIterator]: async function* () {
76+
yield exception;
77+
}
78+
};
79+
const deserStream = getUnmarshalledStream(source, {
80+
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8),
81+
deserializer: message => {
82+
throw new Error("error event");
83+
},
84+
toUtf8
85+
});
86+
let error: Error | undefined = undefined;
87+
try {
88+
for await (const event of deserStream) {
89+
//pass.
90+
}
91+
} catch (e) {
92+
error = e;
93+
}
94+
expect(error).toBeDefined();
95+
expect(error!.message).toEqual("error event");
96+
});
97+
98+
it("throws on exception event if deserializer doesn't throw", async () => {
99+
//This could happened if client-side SDK is not up-to-date
100+
const source = {
101+
[Symbol.asyncIterator]: async function* () {
102+
yield exception;
103+
}
104+
};
105+
const deserStream = getUnmarshalledStream(source, {
106+
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8),
107+
deserializer: message =>
108+
Promise.resolve({
109+
$unknown: message
110+
}),
111+
toUtf8
112+
});
113+
let error: Error | undefined = undefined;
114+
try {
115+
for await (const event of deserStream) {
116+
//pass.
117+
}
118+
} catch (e) {
119+
error = e;
120+
}
121+
expect(error).toBeDefined();
122+
expect(error?.name).toEqual("Exception");
123+
expect(error?.message).toEqual(
124+
"This is a modeled exception event that would be thrown in deserializer."
125+
);
126+
});
69127
});

packages/eventstream-serde-universal/src/getUnmarshalledStream.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller";
2-
import { Message } from "@aws-sdk/types";
2+
import { Message, Encoder } from "@aws-sdk/types";
33

4-
export type UnmarshalledStreamOptions = {
4+
export type UnmarshalledStreamOptions<T> = {
55
eventMarshaller: EventMarshaller;
6+
deserializer: (input: { [name: string]: Message }) => Promise<T>;
7+
toUtf8: Encoder;
68
};
79

8-
export function getUnmarshalledStream(
10+
export function getUnmarshalledStream<T extends { [key: string]: any }>(
911
source: AsyncIterable<Uint8Array>,
10-
options: UnmarshalledStreamOptions
11-
): AsyncIterable<{ [name: string]: Message }> {
12+
options: UnmarshalledStreamOptions<T>
13+
): AsyncIterable<T> {
1214
return {
1315
[Symbol.asyncIterator]: async function* () {
1416
for await (const chunk of source) {
@@ -24,13 +26,22 @@ export function getUnmarshalledStream(
2426
throw unmodeledError;
2527
} else if (messageType === "exception") {
2628
// For modeled exception, push it to deserializer and throw after deserializing
27-
yield {
28-
[message.headers[":exception-type"].value as string]: message
29-
};
29+
const code = message.headers[":exception-type"].value as string;
30+
const exception = { [code]: message };
31+
// Get parsed exception event in key(error code) value(structured error) pair.
32+
const deserializedException = await options.deserializer(exception);
33+
if (deserializedException.$unknown) {
34+
//this is an unmodeled exception then try parsing it with best effort
35+
const error = new Error(options.toUtf8(message.body));
36+
error.name = code;
37+
throw error;
38+
}
39+
throw deserializedException[code];
3040
} else if (messageType === "event") {
31-
yield {
41+
const event = {
3242
[message.headers[":event-type"].value as string]: message
3343
};
44+
yield await options.deserializer(event);
3445
} else {
3546
throw Error(
3647
`Unrecognizable event type: ${message.headers[":event-type"].value}`

0 commit comments

Comments
 (0)