Skip to content

Commit 686a429

Browse files
committed
fix: fix readable chunking to work with node 12+ readable interface
1 parent fc22682 commit 686a429

File tree

1 file changed

+20
-23
lines changed

1 file changed

+20
-23
lines changed

lib/storage/src/data-chunk/readable-helper.ts

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,25 @@ import { Readable } from "stream";
44
import { DEFAULT } from "../upload/defaults";
55
import { DataPart } from "./yield-chunk";
66

7-
interface StreamChunk {
8-
Body: Buffer;
9-
ended: boolean;
10-
}
11-
127
export async function* chunkFromReadable(reader: Readable, chunkSize: number): AsyncGenerator<DataPart, void, unknown> {
138
let partNumber = DEFAULT.MIN_PART_NUMBER;
149
let oldBuffer = Buffer.from("");
1510
while (partNumber < DEFAULT.MAX_PART_NUMBER) {
16-
reader.resume();
17-
const result = await _chunkFromStream(reader, chunkSize, oldBuffer);
18-
reader.pause();
19-
11+
let currentBuffer = oldBuffer;
12+
if(reader.readable) {
13+
reader.resume();
14+
currentBuffer = await _chunkFromStream(reader, chunkSize, oldBuffer);
15+
reader.pause();
16+
}
17+
2018
yield {
21-
Body: result.Body.slice(0, chunkSize),
19+
Body: currentBuffer.slice(0, chunkSize),
2220
PartNumber: partNumber,
2321
};
24-
oldBuffer = result.Body.slice(chunkSize) as Buffer;
22+
oldBuffer = currentBuffer.slice(chunkSize) as Buffer;
2523
partNumber += 1;
2624

27-
if (result.ended && oldBuffer.length == 0) {
25+
if (!reader.readable && oldBuffer.length == 0) {
2826
return;
2927
}
3028
}
@@ -33,7 +31,12 @@ export async function* chunkFromReadable(reader: Readable, chunkSize: number): A
3331
}
3432
}
3533

36-
function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer): Promise<StreamChunk> {
34+
35+
function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer): Promise<Buffer> {
36+
if(!stream.readable) {
37+
return Promise.resolve(oldBuffer);
38+
}
39+
3740
let currentChunk = oldBuffer;
3841
return new Promise((resolve, reject) => {
3942
const cleanupListeners = () => {
@@ -44,12 +47,9 @@ function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer
4447

4548
stream.on("data", (chunk) => {
4649
currentChunk = Buffer.concat([currentChunk, Buffer.from(chunk)]);
47-
if (currentChunk.length >= chunkSize) {
50+
if (currentChunk.length >= chunkSize || !stream.readable) {
4851
cleanupListeners();
49-
resolve({
50-
Body: currentChunk,
51-
ended: false,
52-
});
52+
resolve(currentChunk);
5353
}
5454
});
5555
stream.on("error", (err) => {
@@ -58,10 +58,7 @@ function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer
5858
});
5959
stream.on("end", () => {
6060
cleanupListeners();
61-
resolve({
62-
Body: currentChunk,
63-
ended: true,
64-
});
61+
resolve(currentChunk);
6562
});
6663
});
67-
}
64+
}

0 commit comments

Comments
 (0)