Skip to content

Commit f0698ad

Browse files
committed
feat(util-stream): create checksum stream adapters
1 parent b12dc1d commit f0698ad

File tree

4 files changed

+368
-0
lines changed

4 files changed

+368
-0
lines changed

.changeset/red-cameras-repair.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@smithy/util-stream": minor
3+
---
4+
5+
create checksum stream adapter
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import { Checksum, Encoder } from "@smithy/types";
2+
import { toBase64 } from "@smithy/util-base64";
3+
4+
/**
5+
* @internal
6+
*/
7+
export interface ChecksumStreamInit {
8+
/**
9+
* Base64 value of the expected checksum.
10+
*/
11+
expectedChecksum: string;
12+
/**
13+
* For error messaging, the location from which the checksum value was read.
14+
*/
15+
checksumSourceLocation: string;
16+
/**
17+
* The checksum calculator.
18+
*/
19+
checksum: Checksum;
20+
/**
21+
* The stream to be checked.
22+
*/
23+
source: ReadableStream;
24+
25+
/**
26+
* Optional base 64 encoder if calling from a request context.
27+
*/
28+
base64Encoder?: Encoder;
29+
}
30+
31+
/**
32+
* Alias prevents compiler from turning
33+
* ReadableStream into ReadableStream<any>, which is incompatible
34+
* with the NodeJS.ReadableStream global type.
35+
*/
36+
export type ReadableStreamType = ReadableStream;
37+
38+
/**
39+
* @internal
40+
*
41+
* Creates a stream adapter for throwing checksum errors for streams without
42+
* buffering the stream.
43+
*/
44+
export const createChecksumStream = ({
45+
expectedChecksum,
46+
checksum,
47+
source,
48+
checksumSourceLocation,
49+
base64Encoder,
50+
}: ChecksumStreamInit): ReadableStreamType => {
51+
if (!(source instanceof ReadableStream)) {
52+
throw new Error(
53+
`@smithy/util-stream: unsupported source type ${(source as any)?.constructor?.name ?? source} in ChecksumStream.`
54+
);
55+
}
56+
57+
const encoder = base64Encoder ?? toBase64;
58+
59+
if (typeof TransformStream !== "function") {
60+
throw new Error(
61+
"@smithy/util-stream: unable to instantiate ChecksumStream because API unavailable: ReadableStream/TransformStream."
62+
);
63+
}
64+
65+
const transform = new TransformStream({
66+
start() {},
67+
transform: async (chunk: any, controller: TransformStreamDefaultController) => {
68+
/**
69+
* When the upstream source finishes, perform the checksum comparison.
70+
*/
71+
if (null === chunk) {
72+
const digest: Uint8Array = await checksum.digest();
73+
const received = encoder(digest);
74+
75+
if (expectedChecksum !== received) {
76+
const error = new Error(
77+
`Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` +
78+
` in response header "${checksumSourceLocation}".`
79+
);
80+
controller.error(error);
81+
throw error;
82+
}
83+
controller.terminate();
84+
return;
85+
}
86+
/**
87+
* When the upstream source flows data to this stream,
88+
* calculate a step update of the checksum.
89+
*/
90+
checksum.update(chunk);
91+
controller.enqueue(chunk);
92+
},
93+
flush() {},
94+
});
95+
96+
source.pipeThrough(transform);
97+
return transform.readable;
98+
};
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import { Checksum } from "@smithy/types";
2+
import { toBase64 } from "@smithy/util-base64";
3+
import { toUtf8 } from "@smithy/util-utf8";
4+
import { Readable } from "stream";
5+
6+
import { headStream } from "../headStream";
7+
import { createChecksumStream } from "./createChecksumStream";
8+
9+
describe("Checksum streams", () => {
10+
/**
11+
* Hash "algorithm" that appends all data together.
12+
*/
13+
class Appender implements Checksum {
14+
public hash = "";
15+
async digest(): Promise<Uint8Array> {
16+
return Buffer.from(this.hash);
17+
}
18+
reset(): void {
19+
throw new Error("Function not implemented.");
20+
}
21+
update(chunk: Uint8Array): void {
22+
this.hash += toUtf8(chunk);
23+
}
24+
}
25+
26+
const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0)));
27+
28+
const canonicalUtf8 = toUtf8(canonicalData);
29+
const canonicalBase64 = toBase64(canonicalUtf8);
30+
31+
describe(createChecksumStream.name, () => {
32+
const makeStream = () => {
33+
return Readable.from(Buffer.from(canonicalData.buffer, 0, 26));
34+
};
35+
36+
it("should extend a Readable stream", async () => {
37+
const stream = makeStream();
38+
const checksumStream = createChecksumStream({
39+
expectedChecksum: canonicalBase64,
40+
checksum: new Appender(),
41+
checksumSourceLocation: "my-header",
42+
source: stream,
43+
});
44+
45+
expect(checksumStream.constructor.name).toEqual("ChecksumStream");
46+
47+
const collected = toUtf8(await headStream(checksumStream, Infinity));
48+
expect(collected).toEqual(canonicalUtf8);
49+
expect(stream.readableEnded).toEqual(true);
50+
expect(checksumStream.readableEnded).toEqual(true);
51+
});
52+
53+
it("should throw during stream read if the checksum does not match", async () => {
54+
const stream = makeStream();
55+
const checksumStream = createChecksumStream({
56+
expectedChecksum: "different-expected-checksum",
57+
checksum: new Appender(),
58+
checksumSourceLocation: "my-header",
59+
source: stream,
60+
});
61+
62+
try {
63+
toUtf8(await headStream(checksumStream, Infinity));
64+
throw new Error("stream was read successfully");
65+
} catch (e: unknown) {
66+
expect(String(e)).toEqual(
67+
`Error: Checksum mismatch: expected "different-expected-checksum" but` +
68+
` received "${canonicalBase64}"` +
69+
` in response header "my-header".`
70+
);
71+
}
72+
});
73+
});
74+
75+
describe(createChecksumStream.name + " webstreams API", () => {
76+
if (typeof ReadableStream !== "function") {
77+
// test not applicable to Node.js 16.
78+
return;
79+
}
80+
81+
const makeStream = () => {
82+
return new ReadableStream({
83+
start(controller) {
84+
canonicalData.forEach((byte) => {
85+
controller.enqueue(new Uint8Array([byte]));
86+
});
87+
controller.enqueue(null);
88+
controller.close();
89+
},
90+
});
91+
};
92+
93+
it("should extend a ReadableStream", async () => {
94+
const stream = makeStream();
95+
const checksumStream = createChecksumStream({
96+
expectedChecksum: canonicalBase64,
97+
checksum: new Appender(),
98+
checksumSourceLocation: "my-header",
99+
source: stream,
100+
});
101+
102+
expect(checksumStream).toBeInstanceOf(ReadableStream);
103+
104+
const collected = toUtf8(await headStream(checksumStream, Infinity));
105+
expect(collected).toEqual(canonicalUtf8);
106+
expect(stream.locked).toEqual(true);
107+
108+
// expectation is that it is resolved.
109+
expect(await checksumStream.getReader().closed);
110+
});
111+
112+
it("should throw during stream read if the checksum does not match", async () => {
113+
const stream = makeStream();
114+
const checksumStream = createChecksumStream({
115+
expectedChecksum: "different-expected-checksum",
116+
checksum: new Appender(),
117+
checksumSourceLocation: "my-header",
118+
source: stream,
119+
});
120+
121+
try {
122+
toUtf8(await headStream(checksumStream, Infinity));
123+
throw new Error("stream was read successfully");
124+
} catch (e: unknown) {
125+
expect(String(e)).toEqual(
126+
`Error: Checksum mismatch: expected "different-expected-checksum" but` +
127+
` received "${canonicalBase64}"` +
128+
` in response header "my-header".`
129+
);
130+
}
131+
});
132+
});
133+
});
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { Checksum, Encoder } from "@smithy/types";
2+
import { toBase64 } from "@smithy/util-base64";
3+
import { Duplex, Readable } from "stream";
4+
5+
import { isReadableStream } from "../stream-type-check";
6+
import { createChecksumStream as createChecksumStreamWeb, ReadableStreamType } from "./createChecksumStream.browser";
7+
8+
/**
9+
* @internal
10+
*/
11+
export interface ChecksumStreamInit<T extends Readable | ReadableStream> {
12+
/**
13+
* Base64 value of the expected checksum.
14+
*/
15+
expectedChecksum: string;
16+
/**
17+
* For error messaging, the location from which the checksum value was read.
18+
*/
19+
checksumSourceLocation: string;
20+
/**
21+
* The checksum calculator.
22+
*/
23+
checksum: Checksum;
24+
/**
25+
* The stream to be checked.
26+
*/
27+
source: T;
28+
29+
/**
30+
* Optional base 64 encoder if calling from a request context.
31+
*/
32+
base64Encoder?: Encoder;
33+
}
34+
35+
/**
36+
* @internal
37+
*
38+
* Factory function to allow browser API to mirror this.
39+
*/
40+
export function createChecksumStream(init: ChecksumStreamInit<ReadableStreamType>): ReadableStreamType;
41+
export function createChecksumStream(init: ChecksumStreamInit<Readable>): Readable;
42+
export function createChecksumStream(
43+
init: ChecksumStreamInit<Readable | ReadableStreamType>
44+
): Readable | ReadableStreamType {
45+
if (typeof ReadableStream === "function" && isReadableStream(init.source)) {
46+
return createChecksumStreamWeb(init as ChecksumStreamInit<ReadableStreamType>);
47+
}
48+
return new ChecksumStream(init as ChecksumStreamInit<Readable>);
49+
}
50+
51+
/**
52+
* @internal
53+
*
54+
* Wrapper for throwing checksum errors for streams without
55+
* buffering the stream.
56+
*
57+
*/
58+
class ChecksumStream extends Duplex {
59+
private expectedChecksum: string;
60+
private checksumSourceLocation: string;
61+
private checksum: Checksum;
62+
private source?: Readable;
63+
private base64Encoder: Encoder;
64+
65+
public constructor({
66+
expectedChecksum,
67+
checksum,
68+
source,
69+
checksumSourceLocation,
70+
base64Encoder,
71+
}: ChecksumStreamInit<Readable>) {
72+
super();
73+
if (typeof (source as Readable).pipe === "function") {
74+
this.source = source as Readable;
75+
} else {
76+
throw new Error(
77+
`@smithy/util-stream: unsupported source type ${source?.constructor?.name ?? source} in ChecksumStream.`
78+
);
79+
}
80+
81+
this.base64Encoder = base64Encoder ?? toBase64;
82+
this.expectedChecksum = expectedChecksum;
83+
this.checksum = checksum;
84+
this.checksumSourceLocation = checksumSourceLocation;
85+
86+
// connect this stream to the end of the source stream.
87+
this.source.pipe(this);
88+
}
89+
90+
/**
91+
* @internal do not call this directly.
92+
*/
93+
public _read(
94+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
95+
size: number
96+
): void {}
97+
98+
/**
99+
* @internal do not call this directly.
100+
*
101+
* When the upstream source flows data to this stream,
102+
* calculate a step update of the checksum.
103+
*/
104+
public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void {
105+
this.checksum.update(chunk);
106+
this.push(chunk);
107+
callback();
108+
}
109+
110+
/**
111+
* @internal do not call this directly.
112+
*
113+
* When the upstream source finishes, perform the checksum comparison.
114+
*/
115+
public async _final(callback: (err?: Error) => void) {
116+
try {
117+
const digest: Uint8Array = await this.checksum.digest();
118+
const received = this.base64Encoder(digest);
119+
if (this.expectedChecksum !== received) {
120+
callback(
121+
new Error(
122+
`Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` +
123+
` in response header "${this.checksumSourceLocation}".`
124+
)
125+
);
126+
}
127+
} catch (e: unknown) {
128+
callback(e as Error);
129+
}
130+
this.push(null);
131+
}
132+
}

0 commit comments

Comments
 (0)