Skip to content

Commit c8eef12

Browse files
feat: support bi-directional transcribe streaming over Websocket (#1216)
* feat: add middleware-sdk-transcribe-streaming to handle websocket * fix: uuid not available in rn * feat(eventstream-serde-browser): make it based on universal es serde Now this package can fully serialize & deserailize eventstream with web APIs. In browsers, it converts from user-side iterables of input structure to internal ReadableStream of binaries. In ReactNative, it converts from user-side iterables of input structure to internal iterables of binaries. The same applies to the other converting direction. * chore: add codegen for Transcribe Streaming WebSocket dependencies * feat: generate transcribe streaming client * chore: address feedbacks: prefer FP style * chore(middleware-sdk-transcribe-streaming): use more FP Co-authored-by: Trivikram Kamat <[email protected]>
1 parent b4ef84b commit c8eef12

34 files changed

+1099
-259
lines changed

clients/client-transcribe-streaming/TranscribeStreamingClient.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ import {
3434
getRetryPlugin,
3535
resolveRetryConfig
3636
} from "@aws-sdk/middleware-retry";
37+
import {
38+
WebSocketInputConfig,
39+
WebSocketResolvedConfig,
40+
getWebSocketPlugin,
41+
resolveWebSocketConfig
42+
} from "@aws-sdk/middleware-sdk-transcribe-streaming";
3743
import {
3844
AwsAuthInputConfig,
3945
AwsAuthResolvedConfig,
@@ -176,6 +182,7 @@ export type TranscribeStreamingClientConfig = Partial<
176182
UserAgentInputConfig &
177183
HostHeaderInputConfig &
178184
EventStreamInputConfig &
185+
WebSocketInputConfig &
179186
EventStreamSerdeInputConfig;
180187

181188
export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfiguration<
@@ -189,6 +196,7 @@ export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfigurat
189196
UserAgentResolvedConfig &
190197
HostHeaderResolvedConfig &
191198
EventStreamResolvedConfig &
199+
WebSocketResolvedConfig &
192200
EventStreamSerdeResolvedConfig;
193201

194202
/**
@@ -214,14 +222,16 @@ export class TranscribeStreamingClient extends __Client<
214222
let _config_5 = resolveUserAgentConfig(_config_4);
215223
let _config_6 = resolveHostHeaderConfig(_config_5);
216224
let _config_7 = resolveEventStreamConfig(_config_6);
217-
let _config_8 = resolveEventStreamSerdeConfig(_config_7);
218-
super(_config_8);
219-
this.config = _config_8;
225+
let _config_8 = resolveWebSocketConfig(_config_7);
226+
let _config_9 = resolveEventStreamSerdeConfig(_config_8);
227+
super(_config_9);
228+
this.config = _config_9;
220229
this.middlewareStack.use(getAwsAuthPlugin(this.config));
221230
this.middlewareStack.use(getRetryPlugin(this.config));
222231
this.middlewareStack.use(getUserAgentPlugin(this.config));
223232
this.middlewareStack.use(getContentLengthPlugin(this.config));
224233
this.middlewareStack.use(getHostHeaderPlugin(this.config));
234+
this.middlewareStack.use(getWebSocketPlugin(this.config));
225235
}
226236

227237
destroy(): void {

clients/client-transcribe-streaming/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"@aws-sdk/middleware-eventstream": "1.0.0-gamma.0",
4646
"@aws-sdk/middleware-host-header": "1.0.0-gamma.1",
4747
"@aws-sdk/middleware-retry": "1.0.0-gamma.1",
48+
"@aws-sdk/middleware-sdk-transcribe-streaming": "1.0.0-gamma.0",
4849
"@aws-sdk/middleware-serde": "1.0.0-gamma.1",
4950
"@aws-sdk/middleware-signing": "1.0.0-gamma.1",
5051
"@aws-sdk/middleware-stack": "1.0.0-gamma.1",

clients/client-transcribe-streaming/runtimeConfig.browser.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import { name, version } from "./package.json";
22
import { Sha256 } from "@aws-crypto/sha256-browser";
33
import { eventStreamSerdeProvider } from "@aws-sdk/eventstream-serde-browser";
4-
import { FetchHttpHandler, streamCollector } from "@aws-sdk/fetch-http-handler";
4+
import { streamCollector } from "@aws-sdk/fetch-http-handler";
55
import { invalidFunction } from "@aws-sdk/invalid-dependency";
6+
import {
7+
WebSocketHandler,
8+
eventStreamPayloadHandler
9+
} from "@aws-sdk/middleware-sdk-transcribe-streaming";
610
import { parseUrl } from "@aws-sdk/url-parser-browser";
711
import { fromBase64, toBase64 } from "@aws-sdk/util-base64-browser";
812
import { calculateBodyLength } from "@aws-sdk/util-body-length-browser";
@@ -19,12 +23,10 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
1923
bodyLengthChecker: calculateBodyLength,
2024
credentialDefaultProvider: invalidFunction("Credential is missing") as any,
2125
defaultUserAgent: defaultUserAgent(name, version),
22-
eventStreamPayloadHandlerProvider: () => ({
23-
handle: invalidFunction("event stream request is not supported in browser.")
24-
}),
26+
eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler,
2527
eventStreamSerdeProvider,
2628
regionDefaultProvider: invalidFunction("Region is missing") as any,
27-
requestHandler: new FetchHttpHandler(),
29+
requestHandler: new WebSocketHandler(),
2830
sha256: Sha256,
2931
streamCollector,
3032
urlParser: parseUrl,
Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { name, version } from "./package.json";
22
import { Sha256 } from "@aws-crypto/sha256-js";
3-
import { invalidFunction } from "@aws-sdk/invalid-dependency";
3+
import {
4+
WebSocketHandler,
5+
eventStreamPayloadHandler
6+
} from "@aws-sdk/middleware-sdk-transcribe-streaming";
47
import { parseUrl } from "@aws-sdk/url-parser-node";
58
import { ClientDefaults } from "./TranscribeStreamingClient";
69
import { ClientDefaultValues as BrowserDefaults } from "./runtimeConfig.browser";
@@ -9,17 +12,8 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
912
...BrowserDefaults,
1013
runtime: "react-native",
1114
defaultUserAgent: `aws-sdk-js-v3-react-native-${name}/${version}`,
12-
eventStreamPayloadHandlerProvider: () => ({
13-
handle: invalidFunction(
14-
"event stream request is not supported in ReactNative."
15-
)
16-
}),
17-
eventStreamSerdeProvider: () => ({
18-
serialize: invalidFunction("event stream is not supported in ReactNative."),
19-
deserialize: invalidFunction(
20-
"event stream is not supported in ReactNative."
21-
)
22-
}),
15+
eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler,
16+
requestHandler: new WebSocketHandler(),
2317
sha256: Sha256,
2418
urlParser: parseUrl
2519
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package software.amazon.smithy.aws.typescript.codegen;
2+
3+
import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_CONFIG;
4+
import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_MIDDLEWARE;
5+
6+
7+
import java.util.Collections;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.function.Consumer;
11+
import software.amazon.smithy.aws.traits.ServiceTrait;
12+
import software.amazon.smithy.codegen.core.SymbolProvider;
13+
import software.amazon.smithy.model.Model;
14+
import software.amazon.smithy.model.shapes.ServiceShape;
15+
import software.amazon.smithy.typescript.codegen.LanguageTarget;
16+
import software.amazon.smithy.typescript.codegen.TypeScriptDependency;
17+
import software.amazon.smithy.typescript.codegen.TypeScriptSettings;
18+
import software.amazon.smithy.typescript.codegen.TypeScriptWriter;
19+
import software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin;
20+
import software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration;
21+
import software.amazon.smithy.utils.ListUtils;
22+
import software.amazon.smithy.utils.MapUtils;
23+
24+
/**
25+
* Add client plugins and configs to support WebSocket streaming for Transcribe
26+
* Streaming service
27+
* */
28+
public class AddTranscribeStreamingDependency implements TypeScriptIntegration {
29+
@Override
30+
public List<RuntimeClientPlugin> getClientPlugins() {
31+
return ListUtils.of(
32+
RuntimeClientPlugin.builder()
33+
.withConventions(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.dependency,
34+
"WebSocket")
35+
.servicePredicate((m, s) -> AddTranscribeStreamingDependency.isTranscribeStreaming(s))
36+
.build()
37+
);
38+
}
39+
40+
@Override
41+
public Map<String, Consumer<TypeScriptWriter>> getRuntimeConfigWriters(
42+
TypeScriptSettings settings,
43+
Model model,
44+
SymbolProvider symbolProvider,
45+
LanguageTarget target
46+
) {
47+
ServiceShape service = settings.getService(model);
48+
if (!isTranscribeStreaming(service)) {
49+
return Collections.emptyMap();
50+
}
51+
52+
Map<String, Consumer<TypeScriptWriter>> transcribeStreamingHandlerConfig = MapUtils.of(
53+
"eventStreamPayloadHandlerProvider", writer -> {
54+
writer.addDependency(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE);
55+
writer.addImport("eventStreamPayloadHandler", "eventStreamPayloadHandler",
56+
AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.packageName);
57+
writer.write("eventStreamPayloadHandlerProvider: () => eventStreamPayloadHandler,");
58+
},
59+
"requestHandler", writer -> {
60+
writer.addDependency(AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE);
61+
writer.addImport("WebSocketHandler", "WebSocketHandler",
62+
AwsDependency.TRANSCRIBE_STREAMING_MIDDLEWARE.packageName);
63+
writer.write("requestHandler: new WebSocketHandler(),");
64+
});
65+
66+
switch (target) {
67+
case REACT_NATIVE:
68+
case BROWSER:
69+
return transcribeStreamingHandlerConfig;
70+
default:
71+
return Collections.emptyMap();
72+
}
73+
}
74+
75+
private static boolean isTranscribeStreaming(ServiceShape service) {
76+
String serviceId = service.getTrait(ServiceTrait.class).map(ServiceTrait::getSdkId).orElse("");
77+
return serviceId.equals("Transcribe Streaming");
78+
}
79+
}
80+
81+

codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AwsDependency.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public enum AwsDependency implements SymbolDependencyContainer {
5757
UUID_GENERATOR(NORMAL_DEPENDENCY, "uuid", "^7.0.0"),
5858
UUID_GENERATOR_TYPES(DEV_DEPENDENCY, "@types/uuid", "^7.0.0"),
5959
MIDDLEWARE_EVENTSTREAM(NORMAL_DEPENDENCY, "@aws-sdk/middleware-eventstream", "^1.0.0-beta.0"),
60-
AWS_SDK_EVENTSTREAM_HANDLER_NODE(NORMAL_DEPENDENCY, "@aws-sdk/eventstream-handler-node", "^1.0.0-beta.0");
60+
AWS_SDK_EVENTSTREAM_HANDLER_NODE(NORMAL_DEPENDENCY, "@aws-sdk/eventstream-handler-node", "^1.0.0-beta.0"),
61+
TRANSCRIBE_STREAMING_MIDDLEWARE(NORMAL_DEPENDENCY, "@aws-sdk/middleware-sdk-transcribe-streaming", "^1.0.0-gamma.0");
6162

6263
public final String packageName;
6364
public final String version;

codegen/smithy-aws-typescript-codegen/src/main/resources/META-INF/services/software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ software.amazon.smithy.aws.typescript.codegen.AddBodyChecksumGeneratorDependency
1010
software.amazon.smithy.aws.typescript.codegen.AddS3Config
1111
software.amazon.smithy.aws.typescript.codegen.AddEventStreamHandlingDependency
1212
software.amazon.smithy.aws.typescript.codegen.AddHttp2Dependency
13+
software.amazon.smithy.aws.typescript.codegen.AddTranscribeStreamingDependency

packages/eventstream-serde-browser/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"license": "Apache-2.0",
1717
"dependencies": {
1818
"@aws-sdk/eventstream-marshaller": "1.0.0-gamma.1",
19+
"@aws-sdk/eventstream-serde-universal": "1.0.0-gamma.0",
1920
"@aws-sdk/types": "1.0.0-gamma.1",
2021
"tslib": "^1.8.0"
2122
},
Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1+
import { EventStreamMarshaller as UniversalEventStreamMarshaller } from "@aws-sdk/eventstream-serde-universal";
12
import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller";
23
import {
34
Encoder,
45
Decoder,
56
Message,
67
EventStreamMarshaller as IEventStreamMarshaller
78
} from "@aws-sdk/types";
8-
import { ReadableStreamtoIterable } from "./utils";
9-
import { getChunkedStream } from "./getChunkedStream";
10-
import { getEventMessageStream } from "./getEventMessageStream";
11-
import { getDeserializingStream } from "./getDeserializingStream";
9+
import { readableStreamtoIterable, iterableToReadableStream } from "./utils";
1210

1311
export interface EventStreamMarshaller extends IEventStreamMarshaller {}
1412

@@ -17,35 +15,48 @@ export interface EventStreamMarshallerOptions {
1715
utf8Decoder: Decoder;
1816
}
1917

18+
/**
19+
* Utility class used to serialize and deserialize event streams in
20+
* browsers and ReactNative.
21+
*
22+
* In browsers where ReadableStream API is available:
23+
* * deserialize from ReadableStream to an async iterable of output structure
24+
* * serialize from async iterable of input structure to ReadableStream
25+
* In ReactNative where only async iterable API is available:
26+
* * deserialize from async iterable of binaries to async iterable of output structure
27+
* * serialize from async iterable of input structure to async iterable of binaries
28+
*
29+
* We use ReadableStream API in browsers because of the consistency with other
30+
* streaming operations, where ReadableStream API is used to denote streaming data.
31+
* Whereas in ReactNative, ReadableStream API is not available, we use async iterable
32+
* for streaming data although it has lower throughput.
33+
*/
2034
export class EventStreamMarshaller {
2135
private readonly eventMarshaller: EventMarshaller;
36+
private readonly universalMarshaller: UniversalEventStreamMarshaller;
2237
constructor({ utf8Encoder, utf8Decoder }: EventStreamMarshallerOptions) {
2338
this.eventMarshaller = new EventMarshaller(utf8Encoder, utf8Decoder);
39+
this.universalMarshaller = new UniversalEventStreamMarshaller({
40+
utf8Decoder,
41+
utf8Encoder
42+
});
2443
}
2544

2645
deserialize<T>(
27-
body: ReadableStream<Uint8Array>,
46+
body: ReadableStream<Uint8Array> | AsyncIterable<Uint8Array>,
2847
deserializer: (input: { [event: string]: Message }) => Promise<T>
2948
): AsyncIterable<T> {
30-
const chunkedStream = getChunkedStream(body);
31-
const messageStream = getEventMessageStream(
32-
chunkedStream,
33-
this.eventMarshaller
34-
);
35-
const deserialingStream = getDeserializingStream(
36-
messageStream,
37-
deserializer
38-
);
39-
return ReadableStreamtoIterable(deserialingStream);
49+
const bodyIterable = isReadableStream(body)
50+
? readableStreamtoIterable(body)
51+
: body;
52+
return this.universalMarshaller.deserialize(bodyIterable, deserializer);
4053
}
4154

4255
/**
43-
* Generate a ReadableStream that serialize events
44-
* to event stream binary chunks; Use a pull stream
45-
* here to support low connection speed.
56+
* Generate a stream that serialize events into stream of binary chunks;
4657
*
47-
* This doesn't work on browser currently because
48-
* browser doesn't support upload streaming.
58+
* Caveat is that streaming request payload doesn't work on browser with native
59+
* xhr or fetch handler currently because they don't support upload streaming.
4960
* reference:
5061
* * https://bugs.chromium.org/p/chromium/issues/detail?id=688906
5162
* * https://bugzilla.mozilla.org/show_bug.cgi?id=1387483
@@ -54,8 +65,16 @@ export class EventStreamMarshaller {
5465
serialize<T>(
5566
input: AsyncIterable<T>,
5667
serializer: (event: T) => Message
57-
): ReadableStream {
58-
throw new Error(`event stream request in browser is not supported
59-
Reference: https://bugs.chromium.org/p/chromium/issues/detail?id=688906`);
68+
): ReadableStream | AsyncIterable<Uint8Array> {
69+
const serialziedIterable = this.universalMarshaller.serialize(
70+
input,
71+
serializer
72+
);
73+
return typeof ReadableStream === "function"
74+
? iterableToReadableStream(serialziedIterable)
75+
: serialziedIterable;
6076
}
6177
}
78+
79+
const isReadableStream = (body: any): body is ReadableStream =>
80+
typeof ReadableStream === "function" && body instanceof ReadableStream;

0 commit comments

Comments
 (0)