Skip to content

feat(lib-storage): improve performance by reducing buffer copies #5078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/lib-storage/jest.config.e2e.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
preset: "ts-jest",
testMatch: ["**/*.e2e.spec.ts"],
bail: true,
};
3 changes: 2 additions & 1 deletion lib/lib-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"build:types:downlevel": "downlevel-dts dist-types dist-types/ts3.4",
"clean": "rimraf ./dist-* && rimraf *.tsbuildinfo",
"extract:docs": "api-extractor run --local",
"test": "jest"
"test": "jest",
"test:e2e": "jest -c jest.config.e2e.js"
},
"engines": {
"node": ">=14.0.0"
Expand Down
243 changes: 122 additions & 121 deletions lib/lib-storage/src/Upload.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,135 +312,136 @@ describe(Upload.name, () => {
expect(result.Location).toEqual("https://example-bucket.example-host.com/folder/example-key");
});

it("should upload using multi-part when parts are larger than part size", async () => {
// create a string that's larger than 5MB.
const partSize = 1024 * 1024 * 5;
const largeBuffer = Buffer.from("#".repeat(partSize + 10));
const firstBuffer = largeBuffer.subarray(0, partSize);
const secondBuffer = largeBuffer.subarray(partSize);
const actionParams = { ...params, Body: largeBuffer };
const upload = new Upload({
params: actionParams,
client: new S3({}),
});
await upload.done();
expect(sendMock).toHaveBeenCalledTimes(4);
// create multipartMock is called correctly.
expect(createMultipartMock).toHaveBeenCalledTimes(1);
expect(createMultipartMock).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
});
// upload parts is called correctly.
expect(uploadPartMock).toHaveBeenCalledTimes(2);
expect(uploadPartMock).toHaveBeenNthCalledWith(1, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(firstBuffer),
PartNumber: 1,
UploadId: "mockuploadId",
});
expect(uploadPartMock).toHaveBeenNthCalledWith(2, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(secondBuffer),
PartNumber: 2,
UploadId: "mockuploadId",
});
// complete multipart upload is called correctly.
expect(completeMultipartMock).toHaveBeenCalledTimes(1);
expect(completeMultipartMock).toHaveBeenLastCalledWith({
...actionParams,
Body: undefined,
UploadId: "mockuploadId",
MultipartUpload: {
Parts: [
{
ETag: "mock-upload-Etag",
PartNumber: 1,
},
{
ETag: "mock-upload-Etag-2",
PartNumber: 2,
},
],
},
});
[
{ type: "buffer", largeBuffer: Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10)) },
{ type: "Uint8array", largeBuffer: Uint8Array.from(Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10))) },
].forEach(({ type, largeBuffer }) => {
it(`should upload using multi-part when parts are larger than part size ${type}`, async () => {
const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE);
const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE);
const actionParams = { ...params, Body: largeBuffer };
const upload = new Upload({
params: actionParams,
client: new S3({}),
});
await upload.done();
expect(sendMock).toHaveBeenCalledTimes(4);
// create multipartMock is called correctly.
expect(createMultipartMock).toHaveBeenCalledTimes(1);
expect(createMultipartMock).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
});
// upload parts is called correctly.
expect(uploadPartMock).toHaveBeenCalledTimes(2);
expect(uploadPartMock).toHaveBeenNthCalledWith(1, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(firstBuffer),
PartNumber: 1,
UploadId: "mockuploadId",
});
expect(uploadPartMock).toHaveBeenNthCalledWith(2, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(secondBuffer),
PartNumber: 2,
UploadId: "mockuploadId",
});
// complete multipart upload is called correctly.
expect(completeMultipartMock).toHaveBeenCalledTimes(1);
expect(completeMultipartMock).toHaveBeenLastCalledWith({
...actionParams,
Body: undefined,
UploadId: "mockuploadId",
MultipartUpload: {
Parts: [
{
ETag: "mock-upload-Etag",
PartNumber: 1,
},
{
ETag: "mock-upload-Etag-2",
PartNumber: 2,
},
],
},
});

// no tags were passed.
expect(putObjectTaggingMock).toHaveBeenCalledTimes(0);
// put was not called
expect(putObjectMock).toHaveBeenCalledTimes(0);
});
// no tags were passed.
expect(putObjectTaggingMock).toHaveBeenCalledTimes(0);
// put was not called
expect(putObjectMock).toHaveBeenCalledTimes(0);
});

it("should upload using multi-part when parts are larger than part size stream", async () => {
// create a string that's larger than 5MB.
const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE);
const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE);
const streamBody = Readable.from(
(function* () {
yield largeBuffer;
})()
);
const actionParams = { ...params, Body: streamBody };
const upload = new Upload({
params: actionParams,
client: new S3({}),
});

it("should upload using multi-part when parts are larger than part size stream", async () => {
// create a string that's larger than 5MB.
const largeBuffer = Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10));
const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE);
const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE);
const streamBody = Readable.from(
(function* () {
yield largeBuffer;
})()
);
const actionParams = { ...params, Body: streamBody };
const upload = new Upload({
params: actionParams,
client: new S3({}),
});
await upload.done();

await upload.done();
expect(sendMock).toHaveBeenCalledTimes(4);
// create multipartMock is called correctly.
expect(createMultipartMock).toHaveBeenCalledTimes(1);
expect(createMultipartMock).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
});

expect(sendMock).toHaveBeenCalledTimes(4);
// create multipartMock is called correctly.
expect(createMultipartMock).toHaveBeenCalledTimes(1);
expect(createMultipartMock).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
});
// upload parts is called correctly.
expect(uploadPartMock).toHaveBeenCalledTimes(2);
expect(uploadPartMock).toHaveBeenNthCalledWith(1, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(firstBuffer),
PartNumber: 1,
UploadId: "mockuploadId",
});

// upload parts is called correctly.
expect(uploadPartMock).toHaveBeenCalledTimes(2);
expect(uploadPartMock).toHaveBeenNthCalledWith(1, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(firstBuffer),
PartNumber: 1,
UploadId: "mockuploadId",
});
expect(uploadPartMock).toHaveBeenNthCalledWith(2, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(secondBuffer),
PartNumber: 2,
UploadId: "mockuploadId",
});

expect(uploadPartMock).toHaveBeenNthCalledWith(2, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(secondBuffer),
PartNumber: 2,
UploadId: "mockuploadId",
});
// complete multipart upload is called correctly.
expect(completeMultipartMock).toHaveBeenCalledTimes(1);
expect(completeMultipartMock).toHaveBeenLastCalledWith({
...actionParams,
Body: undefined,
UploadId: "mockuploadId",
MultipartUpload: {
Parts: [
{
ETag: "mock-upload-Etag",
PartNumber: 1,
},
{
ETag: "mock-upload-Etag-2",
PartNumber: 2,
},
],
},
});

// complete multipart upload is called correctly.
expect(completeMultipartMock).toHaveBeenCalledTimes(1);
expect(completeMultipartMock).toHaveBeenLastCalledWith({
...actionParams,
Body: undefined,
UploadId: "mockuploadId",
MultipartUpload: {
Parts: [
{
ETag: "mock-upload-Etag",
PartNumber: 1,
},
{
ETag: "mock-upload-Etag-2",
PartNumber: 2,
},
],
},
// no tags were passed.
expect(putObjectTaggingMock).toHaveBeenCalledTimes(0);
// put was not called
expect(putObjectMock).toHaveBeenCalledTimes(0);
});

// no tags were passed.
expect(putObjectTaggingMock).toHaveBeenCalledTimes(0);
// put was not called
expect(putObjectMock).toHaveBeenCalledTimes(0);
});

it("should add tags to the object if tags have been added PUT", async () => {
Expand Down
8 changes: 7 additions & 1 deletion lib/lib-storage/src/bytelength.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser.

import { ClientDefaultValues } from "./runtimeConfig";

export const byteLength = (input: any) => {
if (input === null || input === undefined) return 0;
if (typeof input === "string") input = Buffer.from(input);

if (typeof input === "string") {
return Buffer.byteLength(input);
}

if (typeof input.byteLength === "number") {
return input.byteLength;
} else if (typeof input.length === "number") {
Expand Down
36 changes: 22 additions & 14 deletions lib/lib-storage/src/chunker.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
import { Buffer } from "buffer";
import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser.
import { Readable } from "stream";

import { getChunkBuffer } from "./chunks/getChunkBuffer";
import { getChunkStream } from "./chunks/getChunkStream";
import { getChunkUint8Array } from "./chunks/getChunkUint8Array";
import { getDataReadable } from "./chunks/getDataReadable";
import { getDataReadableStream } from "./chunks/getDataReadableStream";
import { BodyDataTypes } from "./types";
import type { RawDataPart } from "./Upload";

export const getChunk = (data: BodyDataTypes, partSize: number) => {
if (data instanceof Buffer) {
return getChunkBuffer(data, partSize);
} else if (data instanceof Readable) {
export const getChunk = (data: BodyDataTypes, partSize: number): AsyncGenerator<RawDataPart, void, undefined> => {
if (data instanceof Uint8Array) {
// includes Buffer (extends Uint8Array)
return getChunkUint8Array(data, partSize);
}

if (data instanceof Readable) {
return getChunkStream<Readable>(data, partSize, getDataReadable);
} else if (data instanceof String || typeof data === "string" || data instanceof Uint8Array) {
// chunk Strings, Uint8Array.
return getChunkBuffer(Buffer.from(data), partSize);
}

if (data instanceof String || typeof data === "string") {
return getChunkUint8Array(Buffer.from(data), partSize);
}

if (typeof (data as any).stream === "function") {
// approximate support for Blobs.
return getChunkStream<ReadableStream>((data as any).stream(), partSize, getDataReadableStream);
} else if (data instanceof ReadableStream) {
}

if (data instanceof ReadableStream) {
return getChunkStream<ReadableStream>(data, partSize, getDataReadableStream);
} else {
throw new Error(
"Body Data is unsupported format, expected data to be one of: string | Uint8Array | Buffer | Readable | ReadableStream | Blob;."
);
}

throw new Error(
"Body Data is unsupported format, expected data to be one of: string | Uint8Array | Buffer | Readable | ReadableStream | Blob;."
);
};
Loading