Skip to content

feat: support bi-directional eventstream over H2 #1082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clients/client-transcribe-streaming/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ package-lock.json

*.d.ts
*.js
!jest*.config.js
*.js.map
2 changes: 2 additions & 0 deletions clients/client-transcribe-streaming/.npmignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/coverage/
/docs/
/test/
tsconfig.test.json
*.tsbuildinfo
jest.config.js
20 changes: 17 additions & 3 deletions clients/client-transcribe-streaming/TranscribeStreamingClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import {
resolveEventStreamSerdeConfig
} from "@aws-sdk/eventstream-serde-config-resolver";
import { getContentLengthPlugin } from "@aws-sdk/middleware-content-length";
import {
EventStreamInputConfig,
EventStreamResolvedConfig,
resolveEventStreamConfig
} from "@aws-sdk/middleware-eventstream";
import {
HostHeaderInputConfig,
HostHeaderResolvedConfig,
Expand Down Expand Up @@ -52,6 +57,7 @@ import {
Credentials as __Credentials,
Decoder as __Decoder,
Encoder as __Encoder,
EventStreamPayloadHandlerProvider as __EventStreamPayloadHandlerProvider,
EventStreamSerdeProvider as __EventStreamSerdeProvider,
HashConstructor as __HashConstructor,
HttpHandlerOptions as __HttpHandlerOptions,
Expand Down Expand Up @@ -148,6 +154,11 @@ export interface ClientDefaults
*/
regionInfoProvider?: RegionInfoProvider;

/**
* The function that provides necessary utilities for handling request event stream.
*/
eventStreamPayloadHandlerProvider?: __EventStreamPayloadHandlerProvider;

/**
* The function that provides necessary utilities for generating and parsing event stream
*/
Expand All @@ -164,6 +175,7 @@ export type TranscribeStreamingClientConfig = Partial<
RetryInputConfig &
UserAgentInputConfig &
HostHeaderInputConfig &
EventStreamInputConfig &
EventStreamSerdeInputConfig;

export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfiguration<
Expand All @@ -176,6 +188,7 @@ export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfigurat
RetryResolvedConfig &
UserAgentResolvedConfig &
HostHeaderResolvedConfig &
EventStreamResolvedConfig &
EventStreamSerdeResolvedConfig;

/**
Expand All @@ -200,9 +213,10 @@ export class TranscribeStreamingClient extends __Client<
let _config_4 = resolveRetryConfig(_config_3);
let _config_5 = resolveUserAgentConfig(_config_4);
let _config_6 = resolveHostHeaderConfig(_config_5);
let _config_7 = resolveEventStreamSerdeConfig(_config_6);
super(_config_7);
this.config = _config_7;
let _config_7 = resolveEventStreamConfig(_config_6);
let _config_8 = resolveEventStreamSerdeConfig(_config_7);
super(_config_8);
this.config = _config_8;
this.middlewareStack.use(getAwsAuthPlugin(this.config));
this.middlewareStack.use(getRetryPlugin(this.config));
this.middlewareStack.use(getUserAgentPlugin(this.config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
deserializeAws_restJson1_1StartStreamTranscriptionCommand,
serializeAws_restJson1_1StartStreamTranscriptionCommand
} from "../protocols/Aws_restJson1_1";
import { getEventStreamPlugin } from "@aws-sdk/middleware-eventstream";
import { getSerdePlugin } from "@aws-sdk/middleware-serde";
import {
HttpRequest as __HttpRequest,
Expand Down Expand Up @@ -57,6 +58,7 @@ export class StartStreamTranscriptionCommand extends $Command<
this.middlewareStack.use(
getSerdePlugin(configuration, this.serialize, this.deserialize)
);
this.middlewareStack.use(getEventStreamPlugin(configuration));

const stack = clientStack.concat(this.middlewareStack);

Expand Down
13 changes: 13 additions & 0 deletions clients/client-transcribe-streaming/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const base = require("../../jest.config.base.js");

module.exports = {
...base,
// Only test cjs dist, avoid testing the package twice
testPathIgnorePatterns: ["/node_modules/", "/es/", ".*.integ.spec.js"],
coveragePathIgnorePatterns: [
"/node_modules/",
"/commands/",
"/protocols/", // protocols tested in protocol protocol_tests folder
"endpoints" // endpoint tested in tests/functional/endpoints
]
};
6 changes: 6 additions & 0 deletions clients/client-transcribe-streaming/jest.integ.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
const base = require("../../jest.config.base.js");

module.exports = {
...base,
testMatch: ["**/*.integ.spec.js"]
};
13 changes: 9 additions & 4 deletions clients/client-transcribe-streaming/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
"clean": "npm run remove-definitions && npm run remove-dist && npm run remove-js && npm run remove-maps",
"build-documentation": "npm run clean && typedoc ./",
"prepublishOnly": "yarn build",
"pretest": "tsc",
"pretest": "yarn build",
"remove-definitions": "rimraf ./types",
"remove-dist": "rimraf ./dist",
"remove-documentation": "rimraf ./docs",
"remove-js": "rimraf *.js && rimraf ./commands/*.js && rimraf ./models/*.js && rimraf ./protocols/*.js",
"remove-maps": "rimraf *.js.map && rimraf ./commands/*.js.map && rimraf ./models/*.js.map && rimraf ./protocols/*.js.map",
"test": "exit 0",
"test": "jest --coverage --passWithNoTests",
"test:integration": "jest --config jest.integ.config.js",
"build:cjs": "tsc",
"build:es": "tsc -p tsconfig.es.json",
"build": "yarn pretest && yarn build:es"
"build": "yarn build:cjs && yarn build:es",
"postbuild": "cp test/speech.wav dist/cjs/test"
},
"main": "./dist/cjs/index.js",
"types": "./types/index.d.ts",
Expand All @@ -31,13 +34,15 @@
"@aws-crypto/sha256-js": "^1.0.0-alpha.0",
"@aws-sdk/config-resolver": "1.0.0-gamma.1",
"@aws-sdk/credential-provider-node": "1.0.0-gamma.1",
"@aws-sdk/eventstream-handler-node": "1.0.0-gamma.0",
"@aws-sdk/eventstream-serde-browser": "1.0.0-gamma.1",
"@aws-sdk/eventstream-serde-config-resolver": "1.0.0-gamma.1",
"@aws-sdk/eventstream-serde-node": "1.0.0-gamma.1",
"@aws-sdk/fetch-http-handler": "1.0.0-gamma.1",
"@aws-sdk/hash-node": "1.0.0-gamma.1",
"@aws-sdk/invalid-dependency": "1.0.0-gamma.1",
"@aws-sdk/middleware-content-length": "1.0.0-gamma.1",
"@aws-sdk/middleware-eventstream": "1.0.0-gamma.0",
"@aws-sdk/middleware-host-header": "1.0.0-gamma.1",
"@aws-sdk/middleware-retry": "1.0.0-gamma.1",
"@aws-sdk/middleware-serde": "1.0.0-gamma.1",
Expand Down Expand Up @@ -77,4 +82,4 @@
"url": "https://aws.amazon.com/javascript/"
},
"license": "Apache-2.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ export const serializeAws_restJson1_1StartStreamTranscriptionCommand = async (
serializeAws_restJson1_1AudioStream_event(event, context)
);
}
if (body === undefined) {
body = {};
}
body = JSON.stringify(body);
const { hostname, protocol = "https", port } = await context.endpoint();
return new __HttpRequest({
protocol,
Expand Down
3 changes: 3 additions & 0 deletions clients/client-transcribe-streaming/runtimeConfig.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
bodyLengthChecker: calculateBodyLength,
credentialDefaultProvider: invalidFunction("Credential is missing") as any,
defaultUserAgent: defaultUserAgent(name, version),
eventStreamPayloadHandlerProvider: () => ({
handle: invalidFunction("event stream request is not supported in browser.")
}),
eventStreamSerdeProvider,
regionDefaultProvider: invalidFunction("Region is missing") as any,
requestHandler: new FetchHttpHandler(),
Expand Down
5 changes: 5 additions & 0 deletions clients/client-transcribe-streaming/runtimeConfig.native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
...BrowserDefaults,
runtime: "react-native",
defaultUserAgent: `aws-sdk-js-v3-react-native-${name}/${version}`,
eventStreamPayloadHandlerProvider: () => ({
handle: invalidFunction(
"event stream request is not supported in ReactNative."
)
}),
eventStreamSerdeProvider: () => ({
serialize: invalidFunction("event stream is not supported in ReactNative."),
deserialize: invalidFunction(
Expand Down
6 changes: 4 additions & 2 deletions clients/client-transcribe-streaming/runtimeConfig.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { name, version } from "./package.json";
import { defaultProvider as credentialDefaultProvider } from "@aws-sdk/credential-provider-node";
import { eventStreamPayloadHandlerProvider } from "@aws-sdk/eventstream-handler-node";
import { eventStreamSerdeProvider } from "@aws-sdk/eventstream-serde-node";
import { Hash } from "@aws-sdk/hash-node";
import { NodeHttpHandler, streamCollector } from "@aws-sdk/node-http-handler";
import { NodeHttp2Handler, streamCollector } from "@aws-sdk/node-http-handler";
import { defaultProvider as regionDefaultProvider } from "@aws-sdk/region-provider";
import { parseUrl } from "@aws-sdk/url-parser-node";
import { fromBase64, toBase64 } from "@aws-sdk/util-base64-node";
Expand All @@ -20,9 +21,10 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
bodyLengthChecker: calculateBodyLength,
credentialDefaultProvider,
defaultUserAgent: defaultUserAgent(name, version),
eventStreamPayloadHandlerProvider,
eventStreamSerdeProvider,
regionDefaultProvider,
requestHandler: new NodeHttpHandler(),
requestHandler: new NodeHttp2Handler(),
sha256: Hash.bind(null, "sha256"),
streamCollector,
urlParser: parseUrl,
Expand Down
38 changes: 38 additions & 0 deletions clients/client-transcribe-streaming/test/index.integ.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { TranscribeStreaming } from "../index";
import { createReadStream } from "fs";
import { join } from "path";
const audio = createReadStream(join(__dirname, "speech.wav"));

describe("TranscribeStream client", () => {
const client = new TranscribeStreaming({});
afterAll(() => {
client.destroy();
});

it("should stream the transcript", async () => {
const LanguageCode = "en-US";
const MediaEncoding = "pcm";
const MediaSampleRateHertz = 44100;
const result = await client.startStreamTranscription({
LanguageCode,
MediaEncoding,
MediaSampleRateHertz,
AudioStream: (async function* () {
for await (const chunk of audio) {
yield { AudioEvent: { AudioChunk: chunk } };
}
})()
});
expect(result.LanguageCode).toBe(LanguageCode);
expect(result.MediaEncoding).toBe(MediaEncoding);
expect(result.MediaSampleRateHertz).toBe(MediaSampleRateHertz);
expect(result.TranscriptResultStream).toBeDefined();
const transcripts = [];
for await (const event of result.TranscriptResultStream!) {
transcripts.push(event);
}
expect(
transcripts.filter(event => event["TranscriptEvent"]).length
).toBeGreaterThan(0);
}, 60000);
});
Binary file not shown.
3 changes: 2 additions & 1 deletion clients/client-transcribe-streaming/tsconfig.es.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
"es2015.symbol.wellknown"
],
"outDir": "dist/es"
}
},
"exclude": ["test"]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package software.amazon.smithy.aws.typescript.codegen;

import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_CONFIG;
import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_MIDDLEWARE;


import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

import software.amazon.smithy.codegen.core.SymbolProvider;
import software.amazon.smithy.model.Model;
import software.amazon.smithy.model.knowledge.EventStreamIndex;
import software.amazon.smithy.model.knowledge.TopDownIndex;
import software.amazon.smithy.model.shapes.OperationShape;
import software.amazon.smithy.model.shapes.ServiceShape;
import software.amazon.smithy.typescript.codegen.LanguageTarget;
import software.amazon.smithy.typescript.codegen.TypeScriptDependency;
import software.amazon.smithy.typescript.codegen.TypeScriptSettings;
import software.amazon.smithy.typescript.codegen.TypeScriptWriter;
import software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin;
import software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration;
import software.amazon.smithy.utils.ListUtils;
import software.amazon.smithy.utils.MapUtils;

/**
* Adds runtime client plugins that handle the eventstream flow in request,
* including eventstream payload signing.
*/
public class AddEventStreamHandlingDependency implements TypeScriptIntegration {
@Override
public List<RuntimeClientPlugin> getClientPlugins() {
return ListUtils.of(
RuntimeClientPlugin.builder()
.withConventions(AwsDependency.MIDDLEWARE_EVENTSTREAM.dependency,
"EventStream", HAS_CONFIG)
.servicePredicate(AddEventStreamHandlingDependency::hasEventStreamInput)
.build(),
RuntimeClientPlugin.builder()
.withConventions(AwsDependency.MIDDLEWARE_EVENTSTREAM.dependency,
"EventStream", HAS_MIDDLEWARE)
.operationPredicate(AddEventStreamHandlingDependency::hasEventStreamInput)
.build()
);
}

@Override
public void addConfigInterfaceFields(
TypeScriptSettings settings,
Model model,
SymbolProvider symbolProvider,
TypeScriptWriter writer
) {
if (hasEventStreamInput(model, settings.getService(model))) {
writer.addImport("EventStreamPayloadHandlerProvider", "__EventStreamPayloadHandlerProvider",
TypeScriptDependency.AWS_SDK_TYPES.packageName);
writer.writeDocs("The function that provides necessary utilities for handling request event stream.");
writer.write("eventStreamPayloadHandlerProvider?: __EventStreamPayloadHandlerProvider;\n");
}
}

@Override
public Map<String, Consumer<TypeScriptWriter>> getRuntimeConfigWriters(
TypeScriptSettings settings,
Model model,
SymbolProvider symbolProvider,
LanguageTarget target
) {
ServiceShape service = settings.getService(model);
if (!hasEventStreamInput(model, service)) {
return Collections.emptyMap();
}

switch (target) {
case NODE:
return MapUtils.of("eventStreamPayloadHandlerProvider", writer -> {
writer.addDependency(AwsDependency.AWS_SDK_EVENTSTREAM_HANDLER_NODE);
writer.addImport("eventStreamPayloadHandlerProvider", "eventStreamPayloadHandlerProvider",
AwsDependency.AWS_SDK_EVENTSTREAM_HANDLER_NODE.packageName);
writer.write("eventStreamPayloadHandlerProvider,");
});
case BROWSER:
/**
* Browser doesn't support streaming requests as of March 2020.
* Each service client needs to support eventstream request in browser individually.
* Services like TranscribeStreaming support it via WebSocket.
*/
return MapUtils.of("eventStreamPayloadHandlerProvider", writer -> {
writer.addDependency(TypeScriptDependency.INVALID_DEPENDENCY);
writer.addImport("invalidFunction", "invalidFunction",
TypeScriptDependency.INVALID_DEPENDENCY.packageName);
writer.openBlock("eventStreamPayloadHandlerProvider: () => ({", "}),", () -> {
writer.write("handle: invalidFunction(\"event stream request is not supported in browser.\"),");
});
});
case REACT_NATIVE:
/**
* ReactNative doesn't support streaming requests as of March 2020.
* Here we don't supply invalidFunction. Each service client needs to support eventstream request
* in RN has to implement a customization providing its own eventStreamSignerProvider
*/
return MapUtils.of("eventStreamPayloadHandlerProvider", writer -> {
writer.addDependency(TypeScriptDependency.INVALID_DEPENDENCY);
writer.addImport("invalidFunction", "invalidFunction",
TypeScriptDependency.INVALID_DEPENDENCY.packageName);
writer.openBlock("eventStreamPayloadHandlerProvider: () => ({", "}),", () -> {
writer.write("handle: invalidFunction(\"event stream request is not supported in ReactNative.\"),");
});
});
default:
return Collections.emptyMap();
}
}

private static boolean hasEventStreamInput(Model model, ServiceShape service) {
TopDownIndex topDownIndex = model.getKnowledge(TopDownIndex.class);
Set<OperationShape> operations = topDownIndex.getContainedOperations(service);
EventStreamIndex eventStreamIndex = model.getKnowledge(EventStreamIndex.class);
for (OperationShape operation : operations) {
if (eventStreamIndex.getInputInfo(operation).isPresent()) {
return true;
}
}
return false;
}

private static boolean hasEventStreamInput(Model model, ServiceShape service, OperationShape operation) {
EventStreamIndex eventStreamIndex = model.getKnowledge(EventStreamIndex.class);
return eventStreamIndex.getInputInfo(operation).isPresent();
}
}
Loading