Skip to content

feat(util-stream): create checksum stream adapters #1409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/red-cameras-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@smithy/util-stream": minor
---

create checksum stream adapter
10 changes: 9 additions & 1 deletion packages/util-stream/karma.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
module.exports = function (config) {
config.set({
frameworks: ["jasmine", "karma-typescript"],
files: ["src/getAwsChunkedEncodingStream.browser.ts", "src/getAwsChunkedEncodingStream.browser.spec.ts"],
files: [
"src/checksum/createChecksumStream.browser.spec.ts",
"src/checksum/createChecksumStream.browser.ts",
"src/checksum/ChecksumStream.browser.ts",
"src/getAwsChunkedEncodingStream.browser.spec.ts",
"src/getAwsChunkedEncodingStream.browser.ts",
"src/headStream.browser.ts",
"src/stream-type-check.ts",
],
exclude: ["**/*.d.ts"],
preprocessors: {
"**/*.ts": "karma-typescript",
Expand Down
3 changes: 3 additions & 0 deletions packages/util-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@
"dist-*/**"
],
"browser": {
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser"
},
"react-native": {
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser",
"./dist-cjs/checksum/createChecksumStream": "./dist-cjs/checksum/createChecksumStream.browser",
"./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser",
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser",
"./dist-cjs/headStream": "./dist-cjs/headStream.browser",
Expand Down
39 changes: 39 additions & 0 deletions packages/util-stream/src/checksum/ChecksumStream.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Checksum, Encoder } from "@smithy/types";

/**
* @internal
*/
export interface ChecksumStreamInit {
/**
* Base64 value of the expected checksum.
*/
expectedChecksum: string;
/**
* For error messaging, the location from which the checksum value was read.
*/
checksumSourceLocation: string;
/**
* The checksum calculator.
*/
checksum: Checksum;
/**
* The stream to be checked.
*/
source: ReadableStream;

/**
* Optional base 64 encoder if calling from a request context.
*/
base64Encoder?: Encoder;
}

const ReadableStreamRef = typeof ReadableStream === "function" ? ReadableStream : function (): void {};

/**
* This stub exists so that the readable returned by createChecksumStream
* identifies as "ChecksumStream" in alignment with the Node.js
* implementation.
*
* @extends ReadableStream
*/
export class ChecksumStream extends (ReadableStreamRef as any) {}
118 changes: 118 additions & 0 deletions packages/util-stream/src/checksum/ChecksumStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { Checksum, Encoder } from "@smithy/types";
import { toBase64 } from "@smithy/util-base64";
import { Duplex, Readable } from "stream";

/**
* @internal
*/
export interface ChecksumStreamInit<T extends Readable | ReadableStream> {
/**
* Base64 value of the expected checksum.
*/
expectedChecksum: string;
/**
* For error messaging, the location from which the checksum value was read.
*/
checksumSourceLocation: string;
/**
* The checksum calculator.
*/
checksum: Checksum;
/**
* The stream to be checked.
*/
source: T;

/**
* Optional base 64 encoder if calling from a request context.
*/
base64Encoder?: Encoder;
}

/**
* @internal
*
* Wrapper for throwing checksum errors for streams without
* buffering the stream.
*
*/
export class ChecksumStream extends Duplex {
private expectedChecksum: string;
private checksumSourceLocation: string;
private checksum: Checksum;
private source?: Readable;
private base64Encoder: Encoder;

public constructor({
expectedChecksum,
checksum,
source,
checksumSourceLocation,
base64Encoder,
}: ChecksumStreamInit<Readable>) {
super();
if (typeof (source as Readable).pipe === "function") {
this.source = source as Readable;
} else {
throw new Error(
`@smithy/util-stream: unsupported source type ${source?.constructor?.name ?? source} in ChecksumStream.`
);
}

this.base64Encoder = base64Encoder ?? toBase64;
this.expectedChecksum = expectedChecksum;
this.checksum = checksum;
this.checksumSourceLocation = checksumSourceLocation;

// connect this stream to the end of the source stream.
this.source.pipe(this);
}

/**
* @internal do not call this directly.
*/
public _read(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
size: number
): void {}

/**
* @internal do not call this directly.
*
* When the upstream source flows data to this stream,
* calculate a step update of the checksum.
*/
public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void {
try {
this.checksum.update(chunk);
this.push(chunk);
} catch (e: unknown) {
return callback(e as Error);
}
return callback();
}

/**
* @internal do not call this directly.
*
* When the upstream source finishes, perform the checksum comparison.
*/
public async _final(callback: (err?: Error) => void): Promise<void> {
try {
const digest: Uint8Array = await this.checksum.digest();
const received = this.base64Encoder(digest);
if (this.expectedChecksum !== received) {
return callback(
new Error(
`Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` +
` in response header "${this.checksumSourceLocation}".`
)
);
}
} catch (e: unknown) {
return callback(e as Error);
}
this.push(null);
return callback();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Checksum } from "@smithy/types";
import { toBase64 } from "@smithy/util-base64";
import { toUtf8 } from "@smithy/util-utf8";

import { headStream } from "../headStream.browser";
import { ChecksumStream as ChecksumStreamWeb } from "./ChecksumStream.browser";
import { createChecksumStream } from "./createChecksumStream.browser";

describe("Checksum streams", () => {
/**
* Hash "algorithm" that appends all data together.
*/
class Appender implements Checksum {
public hash = "";
async digest(): Promise<Uint8Array> {
return Buffer.from(this.hash);
}
reset(): void {
throw new Error("Function not implemented.");
}
update(chunk: Uint8Array): void {
this.hash += toUtf8(chunk);
}
}

const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0)));

const canonicalUtf8 = toUtf8(canonicalData);
const canonicalBase64 = toBase64(canonicalUtf8);

describe(createChecksumStream.name + " webstreams API", () => {
if (typeof ReadableStream !== "function") {
// test not applicable to Node.js 16.
return;
}

const makeStream = () => {
return new ReadableStream({
start(controller) {
canonicalData.forEach((byte) => {
controller.enqueue(new Uint8Array([byte]));
});
controller.close();
},
});
};

it("should extend a ReadableStream", async () => {
const stream = makeStream();
const checksumStream = createChecksumStream({
expectedChecksum: canonicalBase64,
checksum: new Appender(),
checksumSourceLocation: "my-header",
source: stream,
});

expect(checksumStream).toBeInstanceOf(ReadableStream);
expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb);

const collected = toUtf8(await headStream(checksumStream, Infinity));
expect(collected).toEqual(canonicalUtf8);
expect(stream.locked).toEqual(true);

// expectation is that it is resolved.
expect(await checksumStream.getReader().closed);
});

it("should throw during stream read if the checksum does not match", async () => {
const stream = makeStream();
const checksumStream = createChecksumStream({
expectedChecksum: "different-expected-checksum",
checksum: new Appender(),
checksumSourceLocation: "my-header",
source: stream,
});

try {
toUtf8(await headStream(checksumStream, Infinity));
throw new Error("stream was read successfully");
} catch (e: unknown) {
expect(String(e)).toEqual(
`Error: Checksum mismatch: expected "different-expected-checksum" but` +
` received "${canonicalBase64}"` +
` in response header "my-header".`
);
}
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { toBase64 } from "@smithy/util-base64";

import { isReadableStream } from "../stream-type-check";
import { ChecksumStream, ChecksumStreamInit } from "./ChecksumStream.browser";

/**
* @internal
* Alias prevents compiler from turning
* ReadableStream into ReadableStream<any>, which is incompatible
* with the NodeJS.ReadableStream global type.
*/
export type ReadableStreamType = ReadableStream;

/**
* This is a local copy of
* https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController
* in case users do not have this type.
*/
interface TransformStreamDefaultController {
enqueue(chunk: any): void;
error(error: unknown): void;
terminate(): void;
}

/**
* @internal
*
* Creates a stream adapter for throwing checksum errors for streams without
* buffering the stream.
*/
export const createChecksumStream = ({
expectedChecksum,
checksum,
source,
checksumSourceLocation,
base64Encoder,
}: ChecksumStreamInit): ReadableStreamType => {
if (!isReadableStream(source)) {
throw new Error(
`@smithy/util-stream: unsupported source type ${(source as any)?.constructor?.name ?? source} in ChecksumStream.`
);
}

const encoder = base64Encoder ?? toBase64;

if (typeof TransformStream !== "function") {
throw new Error(
"@smithy/util-stream: unable to instantiate ChecksumStream because API unavailable: ReadableStream/TransformStream."
);
}

const transform = new TransformStream({
start() {},
async transform(chunk: any, controller: TransformStreamDefaultController) {
/**
* When the upstream source flows data to this stream,
* calculate a step update of the checksum.
*/
checksum.update(chunk);
controller.enqueue(chunk);
},
async flush(controller: TransformStreamDefaultController) {
const digest: Uint8Array = await checksum.digest();
const received = encoder(digest);

if (expectedChecksum !== received) {
const error = new Error(
`Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` +
` in response header "${checksumSourceLocation}".`
);
controller.error(error);
} else {
controller.terminate();
}
},
});

source.pipeThrough(transform);
const readable = transform.readable;
Object.setPrototypeOf(readable, ChecksumStream.prototype);
return readable;
};
Loading
Loading