Skip to content

ref(utils): refactor promisebuffer #4323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/browser/src/transports/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
eventStatusFromHttpCode,
getGlobalObject,
logger,
makePromiseBuffer,
parseRetryAfterHeader,
PromiseBuffer,
SentryError,
Expand All @@ -42,7 +43,7 @@ export abstract class BaseTransport implements Transport {
protected readonly _api: APIDetails;

/** A simple buffer holding all requests. */
protected readonly _buffer: PromiseBuffer<SentryResponse> = new PromiseBuffer(30);
protected readonly _buffer: PromiseBuffer<SentryResponse> = makePromiseBuffer(30);

/** Locks transport after receiving rate limits in a response */
protected readonly _rateLimits: Record<string, Date> = {};
Expand Down
4 changes: 2 additions & 2 deletions packages/core/test/mocks/transport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Event, Response, Transport } from '@sentry/types';
import { PromiseBuffer, SyncPromise } from '@sentry/utils';
import { makePromiseBuffer, PromiseBuffer, SyncPromise } from '@sentry/utils';

async function sleep(delay: number): Promise<void> {
return new SyncPromise(resolve => setTimeout(resolve, delay));
Expand All @@ -11,7 +11,7 @@ export class FakeTransport implements Transport {
public delay: number = 2000;

/** A simple buffer holding all requests. */
protected readonly _buffer: PromiseBuffer<Response> = new PromiseBuffer(9999);
protected readonly _buffer: PromiseBuffer<Response> = makePromiseBuffer(9999);

public sendEvent(_event: Event): PromiseLike<Response> {
this.sendCalled += 1;
Expand Down
11 changes: 9 additions & 2 deletions packages/node/src/transports/base/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import {
Transport,
TransportOptions,
} from '@sentry/types';
import { eventStatusFromHttpCode, logger, parseRetryAfterHeader, PromiseBuffer, SentryError } from '@sentry/utils';
import {
eventStatusFromHttpCode,
logger,
makePromiseBuffer,
parseRetryAfterHeader,
PromiseBuffer,
SentryError,
} from '@sentry/utils';
import * as fs from 'fs';
import * as http from 'http';
import * as https from 'https';
Expand Down Expand Up @@ -43,7 +50,7 @@ export abstract class BaseTransport implements Transport {
protected _api: APIDetails;

/** A simple buffer holding all requests. */
protected readonly _buffer: PromiseBuffer<Response> = new PromiseBuffer(30);
protected readonly _buffer: PromiseBuffer<Response> = makePromiseBuffer(30);

/** Locks transport after receiving rate limits in a response */
protected readonly _rateLimits: Record<string, Date> = {};
Expand Down
79 changes: 44 additions & 35 deletions packages/utils/src/promisebuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,36 @@ function allPromises<U = unknown>(collection: Array<U | PromiseLike<U>>): Promis
});
}

/** A simple queue that holds promises. */
export class PromiseBuffer<T> {
/** Internal set of queued Promises */
private readonly _buffer: Array<PromiseLike<T>> = [];
export interface PromiseBuffer<T> {
length(): number;
add(taskProducer: () => PromiseLike<T>): PromiseLike<T>;
remove(task: PromiseLike<T>): PromiseLike<T>;
drain(timeout?: number): PromiseLike<boolean>;
}

/**
* Creates an new PromiseBuffer object with the specified limit
* @param limit max number of promises that can be stored in the buffer
*/
export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
const buffer: Array<PromiseLike<T>> = [];

function length(): number {
return buffer.length;
}

public constructor(protected _limit?: number) {}
function isReady(): boolean {
return limit === undefined || buffer.length < limit;
}

/**
* Says if the buffer is ready to take more requests
* Remove a promise from the queue.
*
* @param task Can be any PromiseLike<T>
* @returns Removed promise.
*/
public isReady(): boolean {
return this._limit === undefined || this.length() < this._limit;
function remove(task: PromiseLike<T>): PromiseLike<T> {
return buffer.splice(buffer.indexOf(task), 1)[0];
}

/**
Expand All @@ -46,47 +64,29 @@ export class PromiseBuffer<T> {
* limit check.
* @returns The original promise.
*/
public add(taskProducer: () => PromiseLike<T>): PromiseLike<T> {
if (!this.isReady()) {
function add(taskProducer: () => PromiseLike<T>): PromiseLike<T> {
if (!isReady()) {
return SyncPromise.reject(new SentryError('Not adding Promise due to buffer limit reached.'));
}

// start the task and add its promise to the queue
const task = taskProducer();
if (this._buffer.indexOf(task) === -1) {
this._buffer.push(task);
if (buffer.indexOf(task) === -1) {
buffer.push(task);
}
void task
.then(() => this.remove(task))
.then(() => remove(task))
// Use `then(null, rejectionHandler)` rather than `catch(rejectionHandler)` so that we can use `PromiseLike`
// rather than `Promise`. `PromiseLike` doesn't have a `.catch` method, making its polyfill smaller. (ES5 didn't
// have promises, so TS has to polyfill when down-compiling.)
.then(null, () =>
this.remove(task).then(null, () => {
// We have to add another catch here because `this.remove()` starts a new promise chain.
remove(task).then(null, () => {
// We have to add another catch here because `remove()` starts a new promise chain.
}),
);
return task;
}

/**
* Remove a promise from the queue.
*
* @param task Can be any PromiseLike<T>
* @returns Removed promise.
*/
public remove(task: PromiseLike<T>): PromiseLike<T> {
const removedTask = this._buffer.splice(this._buffer.indexOf(task), 1)[0];
return removedTask;
}

/**
* This function returns the number of unresolved promises in the queue.
*/
public length(): number {
return this._buffer.length;
}

/**
* Wait for all promises in the queue to resolve or for timeout to expire, whichever comes first.
*
Expand All @@ -96,7 +96,7 @@ export class PromiseBuffer<T> {
* @returns A promise which will resolve to `true` if the queue is already empty or drains before the timeout, and
* `false` otherwise
*/
public drain(timeout?: number): PromiseLike<boolean> {
function drain(timeout?: number): PromiseLike<boolean> {
return new SyncPromise<boolean>(resolve => {
// wait for `timeout` ms and then resolve to `false` (if not cancelled first)
const capturedSetTimeout = setTimeout(() => {
Expand All @@ -106,10 +106,19 @@ export class PromiseBuffer<T> {
}, timeout);

// if all promises resolve in time, cancel the timer and resolve to `true`
void allPromises(this._buffer).then(() => {
void allPromises(buffer).then(() => {
clearTimeout(capturedSetTimeout);
resolve(true);
});
});
}

const promiseBuffer: PromiseBuffer<T> = {
length,
add,
remove,
drain,
};

return promiseBuffer;
}
20 changes: 10 additions & 10 deletions packages/utils/test/promisebuffer.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { PromiseBuffer } from '../src/promisebuffer';
import { makePromiseBuffer } from '../src/promisebuffer';
import { SyncPromise } from '../src/syncpromise';

describe('PromiseBuffer', () => {
describe('add()', () => {
test('no limit', () => {
const buffer = new PromiseBuffer();
const buffer = makePromiseBuffer();
const p = jest.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
void buffer.add(p);
expect(buffer.length()).toEqual(1);
});

test('with limit', () => {
const buffer = new PromiseBuffer(1);
const buffer = makePromiseBuffer(1);
let task1;
const producer1 = jest.fn(() => {
task1 = new SyncPromise(resolve => setTimeout(resolve));
Expand All @@ -28,7 +28,7 @@ describe('PromiseBuffer', () => {

describe('drain()', () => {
test('without timeout', async () => {
const buffer = new PromiseBuffer();
const buffer = makePromiseBuffer();
for (let i = 0; i < 5; i++) {
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve)));
}
Expand All @@ -39,7 +39,7 @@ describe('PromiseBuffer', () => {
});

test('with timeout', async () => {
const buffer = new PromiseBuffer();
const buffer = makePromiseBuffer();
for (let i = 0; i < 5; i++) {
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve, 100)));
}
Expand All @@ -49,7 +49,7 @@ describe('PromiseBuffer', () => {
});

test('on empty buffer', async () => {
const buffer = new PromiseBuffer();
const buffer = makePromiseBuffer();
expect(buffer.length()).toEqual(0);
const result = await buffer.drain();
expect(result).toEqual(true);
Expand All @@ -58,7 +58,7 @@ describe('PromiseBuffer', () => {
});

test('resolved promises should not show up in buffer length', async () => {
const buffer = new PromiseBuffer();
const buffer = makePromiseBuffer();
const producer = () => new SyncPromise(resolve => setTimeout(resolve));
const task = buffer.add(producer);
expect(buffer.length()).toEqual(1);
Expand All @@ -67,7 +67,7 @@ describe('PromiseBuffer', () => {
});

test('rejected promises should not show up in buffer length', async () => {
const buffer = new PromiseBuffer();
const buffer = makePromiseBuffer();
const producer = () => new SyncPromise((_, reject) => setTimeout(reject));
const task = buffer.add(producer);
expect(buffer.length()).toEqual(1);
Expand All @@ -80,7 +80,7 @@ describe('PromiseBuffer', () => {
});

test('resolved task should give an access to the return value', async () => {
const buffer = new PromiseBuffer<string>();
const buffer = makePromiseBuffer<string>();
const producer = () => new SyncPromise<string>(resolve => setTimeout(() => resolve('test')));
const task = buffer.add(producer);
const result = await task;
Expand All @@ -89,7 +89,7 @@ describe('PromiseBuffer', () => {

test('rejected task should give an access to the return value', async () => {
expect.assertions(1);
const buffer = new PromiseBuffer<string>();
const buffer = makePromiseBuffer<string>();
const producer = () => new SyncPromise<string>((_, reject) => setTimeout(() => reject(new Error('whoops'))));
const task = buffer.add(producer);
try {
Expand Down