Skip to content

Commit 2e4eca4

Browse files
committed
feat(util-stream): enable buffering of stream chunks
1 parent f0562fe commit 2e4eca4

9 files changed

+556
-4
lines changed

.changeset/thirty-walls-pump.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@smithy/util-stream": minor
3+
---
4+
5+
utility for buffering stream chunks

packages/util-stream/package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
"browser": {
6262
"./dist-es/checksum/ChecksumStream": "./dist-es/checksum/ChecksumStream.browser",
6363
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
64+
"./dist-es/createBufferedReadable": "./dist-es/createBufferedReadableStream",
6465
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
6566
"./dist-es/headStream": "./dist-es/headStream.browser",
6667
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
@@ -73,12 +74,14 @@
7374
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
7475
"./dist-es/headStream": "./dist-es/headStream.browser",
7576
"./dist-es/splitStream": "./dist-es/splitStream.browser",
77+
"./dist-es/createBufferedReadable": "./dist-es/createBufferedReadableStream",
7678
"./dist-cjs/checksum/createChecksumStream": "./dist-cjs/checksum/createChecksumStream.browser",
7779
"./dist-cjs/checksum/ChecksumStream": "./dist-cjs/checksum/ChecksumStream.browser",
7880
"./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser",
7981
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser",
8082
"./dist-cjs/headStream": "./dist-cjs/headStream.browser",
81-
"./dist-cjs/splitStream": "./dist-cjs/splitStream.browser"
83+
"./dist-cjs/splitStream": "./dist-cjs/splitStream.browser",
84+
"./dist-cjs/createBufferedReadable": "./dist-cjs/createBufferedReadableStream"
8285
},
8386
"homepage": "https://github.com/awslabs/smithy-typescript/tree/main/packages/util-stream",
8487
"repository": {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Aggregates byteArrays on demand.
3+
* @internal
4+
*/
5+
export class ByteArrayCollector {
6+
public byteLength = 0;
7+
private byteArrays = [] as Uint8Array[];
8+
9+
public constructor(public readonly ctor: (size: number) => Uint8Array) {}
10+
11+
public push(byteArray: Uint8Array) {
12+
this.byteArrays.push(byteArray);
13+
this.byteLength += byteArray.byteLength;
14+
}
15+
16+
public flush() {
17+
const aggregation = this.ctor(this.byteLength);
18+
let cursor = 0;
19+
for (let i = 0; i < this.byteArrays.length; ++i) {
20+
const bytes = this.byteArrays[i];
21+
aggregation.set(bytes, cursor);
22+
cursor += bytes.byteLength;
23+
}
24+
this.byteArrays = [];
25+
this.byteLength = 0;
26+
return aggregation;
27+
}
28+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import { Readable } from "node:stream";
2+
import { describe, expect, test as it, vi } from "vitest";
3+
4+
import { createBufferedReadable } from "./createBufferedReadable";
5+
import { headStream } from "./headStream";
6+
7+
describe("Buffered Readable stream", () => {
8+
function stringStream(size: number, chunkSize: number) {
9+
async function* generate() {
10+
while (size > 0) {
11+
yield "a".repeat(chunkSize);
12+
size -= chunkSize;
13+
}
14+
}
15+
return Readable.from(generate());
16+
}
17+
function byteStream(size: number, chunkSize: number) {
18+
async function* generate() {
19+
while (size > 0) {
20+
yield Buffer.from(new Uint8Array(chunkSize));
21+
size -= chunkSize;
22+
}
23+
}
24+
return Readable.from(generate());
25+
}
26+
const logger = {
27+
debug: vi.fn(),
28+
info: vi.fn(),
29+
warn: vi.fn(),
30+
error() {},
31+
};
32+
33+
it("should join upstream chunks if they are too small (stringStream)", async () => {
34+
const upstream = stringStream(1024, 8);
35+
const downstream = createBufferedReadable(upstream, 64);
36+
37+
let upstreamChunkCount = 0;
38+
upstream.on("data", () => {
39+
upstreamChunkCount += 1;
40+
});
41+
42+
let downstreamChunkCount = 0;
43+
downstream.on("data", () => {
44+
downstreamChunkCount += 1;
45+
});
46+
47+
await headStream(downstream, Infinity);
48+
49+
expect(upstreamChunkCount).toEqual(128);
50+
expect(downstreamChunkCount).toEqual(16);
51+
});
52+
53+
it("should join upstream chunks if they are too small (byteStream)", async () => {
54+
const upstream = byteStream(1031, 7);
55+
const downstream = createBufferedReadable(upstream, 49, logger);
56+
57+
let upstreamChunkCount = 0;
58+
upstream.on("data", () => {
59+
upstreamChunkCount += 1;
60+
});
61+
62+
let downstreamChunkCount = 0;
63+
downstream.on("data", () => {
64+
downstreamChunkCount += 1;
65+
});
66+
67+
await headStream(downstream, Infinity);
68+
69+
expect(Math.ceil(1031 / 7)).toBe(148);
70+
expect(Math.ceil(1031 / 49)).toBe(22);
71+
72+
expect(upstreamChunkCount).toEqual(148);
73+
expect(downstreamChunkCount).toEqual(22);
74+
expect(logger.warn).toHaveBeenCalled();
75+
});
76+
});
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import type { Logger } from "@smithy/types";
2+
import { Readable } from "node:stream";
3+
4+
import { ByteArrayCollector } from "./ByteArrayCollector";
5+
import type { BufferStore, Modes } from "./createBufferedReadableStream";
6+
import { createBufferedReadableStream, flush, merge, modeOf, sizeOf } from "./createBufferedReadableStream";
7+
import { isReadableStream } from "./stream-type-check";
8+
9+
/**
10+
* @internal
11+
* @param upstream - any Readable or ReadableStream.
12+
* @param size - byte or character length minimum. Buffering occurs when a chunk fails to meet this value.
13+
* @param onBuffer - for emitting warnings when buffering occurs.
14+
* @returns another stream of the same data and stream class, but buffers chunks until
15+
* the minimum size is met, except for the last chunk.
16+
*/
17+
export function createBufferedReadable(upstream: Readable, size: number, logger?: Logger): Readable;
18+
export function createBufferedReadable(upstream: ReadableStream, size: number, logger?: Logger): ReadableStream;
19+
export function createBufferedReadable(
20+
upstream: Readable | ReadableStream,
21+
size: number,
22+
logger?: Logger
23+
): Readable | ReadableStream {
24+
if (isReadableStream(upstream)) {
25+
return createBufferedReadableStream(upstream, size, logger);
26+
}
27+
const downstream = new Readable({ read() {} });
28+
let streamBufferingLoggedWarning = false;
29+
let bytesSeen = 0;
30+
31+
const buffers = [
32+
"",
33+
new ByteArrayCollector((size) => new Uint8Array(size)),
34+
new ByteArrayCollector((size) => Buffer.from(new Uint8Array(size))),
35+
] as BufferStore;
36+
let mode: Modes | -1 = -1;
37+
38+
upstream.on("data", (chunk) => {
39+
const chunkMode = modeOf(chunk);
40+
if (mode !== chunkMode) {
41+
if (mode >= 0) {
42+
downstream.push(flush(buffers, mode));
43+
}
44+
mode = chunkMode;
45+
}
46+
if (mode === -1) {
47+
downstream.push(chunk);
48+
return;
49+
}
50+
51+
const chunkSize = sizeOf(chunk);
52+
bytesSeen += chunkSize;
53+
const bufferSize = sizeOf(buffers[mode]);
54+
if (chunkSize >= size && bufferSize === 0) {
55+
// skip writing to the intermediate buffer
56+
// because the upstream chunk is already large enough.
57+
downstream.push(chunk);
58+
} else {
59+
// buffer and potentially flush the data downstream.
60+
const newSize = merge(buffers, mode, chunk);
61+
if (!streamBufferingLoggedWarning && bytesSeen > size * 2) {
62+
streamBufferingLoggedWarning = true;
63+
logger?.warn(
64+
`@smithy/util-stream - stream chunk size ${chunkSize} is below threshold of ${size}, automatically buffering.`
65+
);
66+
}
67+
if (newSize >= size) {
68+
downstream.push(flush(buffers, mode));
69+
}
70+
}
71+
});
72+
upstream.on("end", () => {
73+
downstream.push(flush(buffers, mode));
74+
downstream.push(null);
75+
});
76+
77+
return downstream;
78+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import { Readable } from "node:stream";
2+
import { describe, expect, test as it, vi } from "vitest";
3+
4+
import { createBufferedReadable } from "./createBufferedReadableStream";
5+
6+
describe("Buffered ReadableStream", () => {
7+
function stringStream(size: number, chunkSize: number) {
8+
async function* generate() {
9+
while (size > 0) {
10+
yield "a".repeat(chunkSize);
11+
size -= chunkSize;
12+
}
13+
}
14+
return Readable.toWeb(Readable.from(generate()));
15+
}
16+
function byteStream(size: number, chunkSize: number) {
17+
async function* generate() {
18+
while (size > 0) {
19+
yield new Uint8Array(chunkSize);
20+
size -= chunkSize;
21+
}
22+
}
23+
return Readable.toWeb(Readable.from(generate()));
24+
}
25+
26+
const logger = {
27+
debug: vi.fn(),
28+
info: vi.fn(),
29+
warn: vi.fn(),
30+
error() {},
31+
};
32+
33+
it("should join upstream chunks if they are too small (stringStream)", async () => {
34+
let upstreamChunkCount = 0;
35+
let downstreamChunkCount = 0;
36+
37+
const upstream = stringStream(1024, 8);
38+
const upstreamReader = upstream.getReader();
39+
40+
const midstream = new ReadableStream({
41+
async pull(controller) {
42+
const { value, done } = await upstreamReader.read();
43+
if (done) {
44+
controller.close();
45+
} else {
46+
expect(value.length).toBe(8);
47+
upstreamChunkCount += 1;
48+
controller.enqueue(value);
49+
}
50+
},
51+
});
52+
const downstream = createBufferedReadable(midstream, 64);
53+
const reader = downstream.getReader();
54+
55+
while (true) {
56+
const { done, value } = await reader.read();
57+
if (done) {
58+
break;
59+
} else {
60+
downstreamChunkCount += 1;
61+
expect(value.length).toBe(64);
62+
}
63+
}
64+
65+
expect(upstreamChunkCount).toEqual(128);
66+
expect(downstreamChunkCount).toEqual(16);
67+
});
68+
69+
it("should join upstream chunks if they are too small (byteStream)", async () => {
70+
let upstreamChunkCount = 0;
71+
let downstreamChunkCount = 0;
72+
73+
const upstream = byteStream(1031, 7);
74+
const upstreamReader = upstream.getReader();
75+
76+
const midstream = new ReadableStream({
77+
async pull(controller) {
78+
const { value, done } = await upstreamReader.read();
79+
if (done) {
80+
controller.close();
81+
} else {
82+
expect(value.length).toBe(7);
83+
upstreamChunkCount += 1;
84+
controller.enqueue(value);
85+
}
86+
},
87+
});
88+
const downstream = createBufferedReadable(midstream, 49, logger);
89+
const downstreamReader = downstream.getReader();
90+
91+
while (true) {
92+
const { done, value } = await downstreamReader.read();
93+
if (done) {
94+
break;
95+
} else {
96+
downstreamChunkCount += 1;
97+
if (value.byteLength > 7) {
98+
expect(value.byteLength).toBe(49);
99+
}
100+
}
101+
}
102+
103+
expect(upstreamChunkCount).toEqual(148);
104+
expect(downstreamChunkCount).toEqual(22);
105+
expect(logger.warn).toHaveBeenCalled();
106+
});
107+
});

0 commit comments

Comments
 (0)