Skip to content

Commit 943dab0

Browse files
authored
feat(hash-stream-node): add readableStreamHasher (#3088)
1 parent 5c63cef commit 943dab0

File tree

4 files changed

+164
-3
lines changed

4 files changed

+164
-3
lines changed

packages/hash-stream-node/README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
[![NPM version](https://img.shields.io/npm/v/@aws-sdk/hash-stream-node/latest.svg)](https://www.npmjs.com/package/@aws-sdk/hash-stream-node)
44
[![NPM downloads](https://img.shields.io/npm/dm/@aws-sdk/hash-stream-node.svg)](https://www.npmjs.com/package/@aws-sdk/hash-stream-node)
55

6-
A utility for calculating the hash of Node.JS readable streams. This package is
7-
currently only compatible with file streams, as no other stream type can be
8-
replayed.
6+
A utility for calculating the hash of Node.JS readable streams.
97

108
> An internal package
119
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export * from "./fileStreamHasher";
2+
export * from "./readableStreamHasher";
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import { Hash } from "@aws-sdk/types";
2+
import { Readable, Writable, WritableOptions } from "stream";
3+
4+
import { HashCalculator } from "./HashCalculator";
5+
import { readableStreamHasher } from "./readableStreamHasher";
6+
7+
jest.mock("./HashCalculator");
8+
9+
describe(readableStreamHasher.name, () => {
10+
const mockDigest = jest.fn();
11+
const mockHashCtor = jest.fn().mockImplementation(() => ({
12+
update: jest.fn(),
13+
digest: mockDigest,
14+
}));
15+
16+
const mockHashCalculatorWrite = jest.fn();
17+
const mockHashCalculatorEnd = jest.fn();
18+
19+
const mockHash = new Uint8Array(Buffer.from("mockHash"));
20+
21+
class MockHashCalculator extends Writable {
22+
constructor(public readonly hash: Hash, public readonly mockWrite, public readonly mockEnd) {
23+
super();
24+
}
25+
26+
_write(chunk: Buffer, encoding: string, callback: (err?: Error) => void) {
27+
this.mockWrite(chunk);
28+
callback();
29+
}
30+
31+
end() {
32+
this.mockEnd();
33+
super.end();
34+
}
35+
}
36+
37+
beforeEach(() => {
38+
(HashCalculator as unknown as jest.Mock).mockImplementation(
39+
(hash) => new MockHashCalculator(hash, mockHashCalculatorWrite, mockHashCalculatorEnd)
40+
);
41+
mockDigest.mockResolvedValue(mockHash);
42+
});
43+
44+
afterEach(() => {
45+
jest.clearAllMocks();
46+
});
47+
48+
it("computes hash for a readable stream", async () => {
49+
const readableStream = new Readable({
50+
read: (size) => {},
51+
});
52+
const hashPromise = readableStreamHasher(mockHashCtor, readableStream);
53+
54+
// @ts-ignore Property '_readableState' does not exist on type 'Readable'.
55+
const { pipesCount } = readableStream._readableState;
56+
expect(pipesCount).toEqual(1);
57+
58+
const mockDataChunks = ["Hello", "World"];
59+
setTimeout(() => {
60+
mockDataChunks.forEach((chunk) => readableStream.emit("data", chunk));
61+
readableStream.emit("end");
62+
}, 100);
63+
64+
expect(await hashPromise).toEqual(mockHash);
65+
expect(mockHashCalculatorWrite).toHaveBeenCalledTimes(mockDataChunks.length);
66+
mockDataChunks.forEach((chunk, index) =>
67+
expect(mockHashCalculatorWrite).toHaveBeenNthCalledWith(index + 1, Buffer.from(chunk))
68+
);
69+
expect(mockDigest).toHaveBeenCalledTimes(1);
70+
expect(mockHashCalculatorEnd).toHaveBeenCalledTimes(1);
71+
});
72+
73+
it("throws error if readable stream throws error", async () => {
74+
const readableStream = new Readable({
75+
read: (size) => {},
76+
});
77+
const hashPromise = readableStreamHasher(mockHashCtor, readableStream);
78+
79+
const mockError = new Error("error");
80+
setTimeout(() => {
81+
readableStream.emit("error", mockError);
82+
}, 100);
83+
84+
try {
85+
await hashPromise;
86+
fail(`should throw error ${mockError}`);
87+
} catch (error) {
88+
expect(error).toEqual(mockError);
89+
expect(mockHashCalculatorEnd).toHaveBeenCalledTimes(1);
90+
}
91+
});
92+
93+
it("throws error if HashCalculator throws error", async () => {
94+
const mockHashCalculator = new MockHashCalculator(
95+
mockHashCtor as any,
96+
mockHashCalculatorWrite,
97+
mockHashCalculatorEnd
98+
);
99+
(HashCalculator as unknown as jest.Mock).mockImplementation((hash) => mockHashCalculator);
100+
101+
const readableStream = new Readable({
102+
read: (size) => {},
103+
});
104+
const hashPromise = readableStreamHasher(mockHashCtor, readableStream);
105+
106+
const mockError = new Error("error");
107+
setTimeout(() => {
108+
mockHashCalculator.emit("error", mockError);
109+
}, 100);
110+
111+
try {
112+
await hashPromise;
113+
fail(`should throw error ${mockError}`);
114+
} catch (error) {
115+
expect(error).toEqual(mockError);
116+
}
117+
});
118+
119+
it("throws error if hash.digest() throws error", async () => {
120+
const readableStream = new Readable({
121+
read: (size) => {},
122+
});
123+
const hashPromise = readableStreamHasher(mockHashCtor, readableStream);
124+
125+
setTimeout(() => {
126+
readableStream.emit("end");
127+
}, 100);
128+
129+
const mockError = new Error("error");
130+
mockDigest.mockRejectedValue(mockError);
131+
132+
try {
133+
await hashPromise;
134+
fail(`should throw error ${mockError}`);
135+
} catch (error) {
136+
expect(error).toEqual(mockError);
137+
expect(mockHashCalculatorEnd).toHaveBeenCalledTimes(1);
138+
}
139+
});
140+
});
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { HashConstructor, StreamHasher } from "@aws-sdk/types";
2+
import { Readable } from "stream";
3+
4+
import { HashCalculator } from "./HashCalculator";
5+
6+
export const readableStreamHasher: StreamHasher<Readable> = (hashCtor: HashConstructor, readableStream: Readable) => {
7+
const hash = new hashCtor();
8+
const hashCalculator = new HashCalculator(hash);
9+
readableStream.pipe(hashCalculator);
10+
11+
return new Promise((resolve, reject) => {
12+
readableStream.on("error", (err: Error) => {
13+
// if the source errors, the destination stream needs to manually end
14+
hashCalculator.end();
15+
reject(err);
16+
});
17+
hashCalculator.on("error", reject);
18+
hashCalculator.on("finish", () => {
19+
hash.digest().then(resolve).catch(reject);
20+
});
21+
});
22+
};

0 commit comments

Comments
 (0)