Skip to content

fix(replay): Handle compression failures more robustly #6988

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/replay/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ module.exports = {
// TODO: figure out if we need a worker-specific tsconfig
project: ['tsconfig.worker.json'],
},
rules: {
// We cannot use backticks, as that conflicts with the stringified worker
'prefer-template': 'off',
},
},
{
files: ['src/worker/**/*.js'],
Expand Down
27 changes: 10 additions & 17 deletions packages/replay/src/eventBuffer/EventBufferArray.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,31 @@ import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
* Used as fallback if the compression worker cannot be loaded or is disabled.
*/
export class EventBufferArray implements EventBuffer {
private _events: RecordingEvent[];
/** All the events that are buffered to be sent. */
public events: RecordingEvent[];

public constructor() {
this._events = [];
this.events = [];
}

/** @inheritdoc */
public get pendingLength(): number {
return this._events.length;
}

/**
* Returns the raw events that are buffered. In `EventBufferArray`, this is the
* same as `this._events`.
*/
public get pendingEvents(): RecordingEvent[] {
return this._events;
public get hasEvents(): boolean {
return this.events.length > 0;
}

/** @inheritdoc */
public destroy(): void {
this._events = [];
this.events = [];
}

/** @inheritdoc */
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
if (isCheckout) {
this._events = [event];
this.events = [event];
return;
}

this._events.push(event);
this.events.push(event);
return;
}

Expand All @@ -46,8 +39,8 @@ export class EventBufferArray implements EventBuffer {
// Make a copy of the events array reference and immediately clear the
// events member so that we do not lose new events while uploading
// attachment.
const eventsRet = this._events;
this._events = [];
const eventsRet = this.events;
this.events = [];
resolve(JSON.stringify(eventsRet));
});
}
Expand Down
170 changes: 23 additions & 147 deletions packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,36 @@
import type { ReplayRecordingData } from '@sentry/types';
import { logger } from '@sentry/utils';

import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest, WorkerResponse } from '../types';
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
import { WorkerHandler } from './WorkerHandler';

/**
* Event buffer that uses a web worker to compress events.
* Exported only for testing.
*/
export class EventBufferCompressionWorker implements EventBuffer {
/**
* Keeps track of the list of events since the last flush that have not been compressed.
* For example, page is reloaded and a flush attempt is made, but
* `finish()` (and thus the flush), does not complete.
*/
public _pendingEvents: RecordingEvent[] = [];
/** @inheritdoc */
public hasEvents: boolean;

private _worker: Worker;
private _eventBufferItemLength: number = 0;
private _id: number = 0;
private _ensureReadyPromise?: Promise<void>;
private _worker: WorkerHandler;

public constructor(worker: Worker) {
this._worker = worker;
}

/**
* The number of raw events that are buffered. This may not be the same as
* the number of events that have been compresed in the worker because
* `addEvent` is async.
*/
public get pendingLength(): number {
return this._eventBufferItemLength;
}

/**
* Returns a list of the raw recording events that are being compressed.
*/
public get pendingEvents(): RecordingEvent[] {
return this._pendingEvents;
Comment on lines -39 to -40
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pendingEvents was added to attempt to send a segment on unload (without compression), but since we have decided to pause on that, I think it's good to remove this 👍

this._worker = new WorkerHandler(worker);
this.hasEvents = false;
}

/**
* Ensure the worker is ready (or not).
* This will either resolve when the worker is ready, or reject if an error occured.
*/
public ensureReady(): Promise<void> {
// Ensure we only check once
if (this._ensureReadyPromise) {
return this._ensureReadyPromise;
}

this._ensureReadyPromise = new Promise((resolve, reject) => {
this._worker.addEventListener(
'message',
({ data }: MessageEvent) => {
if ((data as WorkerResponse).success) {
resolve();
} else {
reject();
}
},
{ once: true },
);

this._worker.addEventListener(
'error',
error => {
reject(error);
},
{ once: true },
);
});

return this._ensureReadyPromise;
return this._worker.ensureReady();
}

/**
* Destroy the event buffer.
*/
public destroy(): void {
__DEBUG_BUILD__ && logger.log('[Replay] Destroying compression worker');
this._worker.terminate();
this._worker.destroy();
}

/**
Expand All @@ -89,19 +39,12 @@ export class EventBufferCompressionWorker implements EventBuffer {
* Returns true if event was successfuly received and processed by worker.
*/
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
this.hasEvents = true;

if (isCheckout) {
// This event is a checkout, make sure worker buffer is cleared before
// proceeding.
await this._postMessage({
id: this._getAndIncrementId(),
method: 'init',
args: [],
});
}

// Don't store checkout events in `_pendingEvents` because they are too large
if (!isCheckout) {
this._pendingEvents.push(event);
await this._clear();
}

return this._sendEventToWorker(event);
Expand All @@ -110,97 +53,30 @@ export class EventBufferCompressionWorker implements EventBuffer {
/**
* Finish the event buffer and return the compressed data.
*/
public async finish(): Promise<ReplayRecordingData> {
try {
return await this._finishRequest(this._getAndIncrementId());
} catch (error) {
__DEBUG_BUILD__ && logger.error('[Replay] Error when trying to compress events', error);
// fall back to uncompressed
const events = this.pendingEvents;
return JSON.stringify(events);
}
}

/**
* Post message to worker and wait for response before resolving promise.
*/
private _postMessage<T>({ id, method, args }: WorkerRequest): Promise<T> {
return new Promise((resolve, reject) => {
const listener = ({ data }: MessageEvent): void => {
const response = data as WorkerResponse;
if (response.method !== method) {
return;
}

// There can be multiple listeners for a single method, the id ensures
// that the response matches the caller.
if (response.id !== id) {
return;
}

// At this point, we'll always want to remove listener regardless of result status
this._worker.removeEventListener('message', listener);

if (!response.success) {
// TODO: Do some error handling, not sure what
__DEBUG_BUILD__ && logger.error('[Replay]', response.response);

reject(new Error('Error in compression worker'));
return;
}

resolve(response.response as T);
};

let stringifiedArgs;
try {
stringifiedArgs = JSON.stringify(args);
} catch (err) {
__DEBUG_BUILD__ && logger.error('[Replay] Error when trying to stringify args', err);
stringifiedArgs = '[]';
}

// Note: we can't use `once` option because it's possible it needs to
// listen to multiple messages
this._worker.addEventListener('message', listener);
this._worker.postMessage({ id, method, args: stringifiedArgs });
});
public finish(): Promise<ReplayRecordingData> {
return this._finishRequest();
}

/**
* Send the event to the worker.
*/
private async _sendEventToWorker(event: RecordingEvent): Promise<AddEventResult> {
const promise = this._postMessage<void>({
id: this._getAndIncrementId(),
method: 'addEvent',
args: [event],
});

// XXX: See note in `get length()`
this._eventBufferItemLength++;

return promise;
private _sendEventToWorker(event: RecordingEvent): Promise<AddEventResult> {
return this._worker.postMessage<void>('addEvent', JSON.stringify(event));
}

/**
* Finish the request and return the compressed data from the worker.
*/
private async _finishRequest(id: number): Promise<Uint8Array> {
const promise = this._postMessage<Uint8Array>({ id, method: 'finish', args: [] });

// XXX: See note in `get length()`
this._eventBufferItemLength = 0;

await promise;
private async _finishRequest(): Promise<Uint8Array> {
const response = await this._worker.postMessage<Uint8Array>('finish');

this._pendingEvents = [];
this.hasEvents = false;

return promise;
return response;
}

/** Get the current ID and increment it for the next call. */
private _getAndIncrementId(): number {
return this._id++;
/** Clear any pending events from the worker. */
private _clear(): Promise<void> {
return this._worker.postMessage('clear');
}
}
31 changes: 17 additions & 14 deletions packages/replay/src/eventBuffer/EventBufferProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,12 @@ export class EventBufferProxy implements EventBuffer {
this._compression = new EventBufferCompressionWorker(worker);
this._used = this._fallback;

this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded().catch(() => {
// Ignore errors here
});
this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to catch this, as we catch everything in this method now.

}

/** @inheritDoc */
public get pendingLength(): number {
return this._used.pendingLength;
}

/** @inheritDoc */
public get pendingEvents(): RecordingEvent[] {
return this._used.pendingEvents;
public get hasEvents(): boolean {
return this._used.hasEvents;
}

/** @inheritDoc */
Expand Down Expand Up @@ -75,18 +68,28 @@ export class EventBufferProxy implements EventBuffer {
return;
}

// Compression worker is ready, we can use it
// Now we need to switch over the array buffer to the compression worker
await this._switchToCompressionWorker();
}

/** Switch the used buffer to the compression worker. */
private async _switchToCompressionWorker(): Promise<void> {
const { events } = this._fallback;

const addEventPromises: Promise<void>[] = [];
for (const event of this._fallback.pendingEvents) {
for (const event of events) {
addEventPromises.push(this._compression.addEvent(event));
}

// We switch over to the compression buffer immediately - any further events will be added
// We switch over to the new buffer immediately - any further events will be added
// after the previously buffered ones
this._used = this._compression;

// Wait for original events to be re-added before resolving
await Promise.all(addEventPromises);
try {
await Promise.all(addEventPromises);
} catch (error) {
__DEBUG_BUILD__ && logger.warn('[Replay] Failed to add events when switching buffers.', error);
}
}
}
Loading