Skip to content

fix(replay): Handle compression worker errors more gracefully #6936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/replay/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@ export const MASK_ALL_TEXT_SELECTOR = 'body *:not(style), body *:not(script)';
export const DEFAULT_FLUSH_MIN_DELAY = 5_000;
export const DEFAULT_FLUSH_MAX_DELAY = 5_000;

/* How long to wait for error checkouts */
export const ERROR_CHECKOUT_TIME = 60_000;

export const RETRY_BASE_INTERVAL = 5000;
export const RETRY_MAX_COUNT = 3;
54 changes: 54 additions & 0 deletions packages/replay/src/eventBuffer/EventBufferArray.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';

/**
* A basic event buffer that does not do any compression.
* Used as fallback if the compression worker cannot be loaded or is disabled.
*/
export class EventBufferArray implements EventBuffer {
private _events: RecordingEvent[];

public constructor() {
this._events = [];
}

/** @inheritdoc */
public get pendingLength(): number {
return this._events.length;
}

/**
* Returns the raw events that are buffered. In `EventBufferArray`, this is the
* same as `this._events`.
*/
public get pendingEvents(): RecordingEvent[] {
return this._events;
}

/** @inheritdoc */
public destroy(): void {
this._events = [];
}

/** @inheritdoc */
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
if (isCheckout) {
this._events = [event];
return;
}

this._events.push(event);
return;
}

/** @inheritdoc */
public finish(): Promise<string> {
return new Promise<string>(resolve => {
// Make a copy of the events array reference and immediately clear the
// events member so that we do not lose new events while uploading
// attachment.
const eventsRet = this._events;
this._events = [];
resolve(JSON.stringify(eventsRet));
});
}
}
Original file line number Diff line number Diff line change
@@ -1,153 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
// TODO: figure out member access types and remove the line above

import type { ReplayRecordingData } from '@sentry/types';
import { logger } from '@sentry/utils';

import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest } from './types';
import workerString from './worker/worker.js';

interface CreateEventBufferParams {
useCompression: boolean;
}

/**
* Create an event buffer for replays.
*/
export function createEventBuffer({ useCompression }: CreateEventBufferParams): EventBuffer {
// eslint-disable-next-line no-restricted-globals
if (useCompression && window.Worker) {
const workerBlob = new Blob([workerString]);
const workerUrl = URL.createObjectURL(workerBlob);

__DEBUG_BUILD__ && logger.log('[Replay] Using compression worker');
const worker = new Worker(workerUrl);
return new EventBufferProxy(worker);
}

__DEBUG_BUILD__ && logger.log('[Replay] Using simple buffer');
return new EventBufferArray();
}

/**
* This proxy will try to use the compression worker, and fall back to use the simple buffer if an error occurs there.
* This can happen e.g. if the worker cannot be loaded.
* Exported only for testing.
*/
export class EventBufferProxy implements EventBuffer {
private _fallback: EventBufferArray;
private _compression: EventBufferCompressionWorker;
private _used: EventBuffer;

public constructor(worker: Worker) {
this._fallback = new EventBufferArray();
this._compression = new EventBufferCompressionWorker(worker);
this._used = this._fallback;

void this._ensureWorkerIsLoaded();
}

/** @inheritDoc */
public get pendingLength(): number {
return this._used.pendingLength;
}

/** @inheritDoc */
public get pendingEvents(): RecordingEvent[] {
return this._used.pendingEvents;
}

/** @inheritDoc */
public destroy(): void {
this._fallback.destroy();
this._compression.destroy();
}

/**
* Add an event to the event buffer.
*
* Returns true if event was successfully added.
*/
public addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
return this._used.addEvent(event, isCheckout);
}

/** @inheritDoc */
public finish(): Promise<ReplayRecordingData> {
return this._used.finish();
}

/** Ensure the worker has loaded. */
private async _ensureWorkerIsLoaded(): Promise<void> {
try {
await this._compression.ensureReady();
} catch (error) {
// If the worker fails to load, we fall back to the simple buffer.
// Nothing more to do from our side here
__DEBUG_BUILD__ && logger.log('[Replay] Failed to load the compression worker, falling back to simple buffer');
return;
}

// Compression worker is ready, we can use it
// Now we need to switch over the array buffer to the compression worker
const addEventPromises: Promise<void>[] = [];
for (const event of this._fallback.pendingEvents) {
addEventPromises.push(this._compression.addEvent(event));
}

// We switch over to the compression buffer immediately - any further events will be added
// after the previously buffered ones
this._used = this._compression;

// Wait for original events to be re-added before resolving
await Promise.all(addEventPromises);
}
}

class EventBufferArray implements EventBuffer {
private _events: RecordingEvent[];

public constructor() {
this._events = [];
}

public get pendingLength(): number {
return this._events.length;
}

/**
* Returns the raw events that are buffered. In `EventBufferArray`, this is the
* same as `this._events`.
*/
public get pendingEvents(): RecordingEvent[] {
return this._events;
}

public destroy(): void {
this._events = [];
}

public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
if (isCheckout) {
this._events = [event];
return;
}

this._events.push(event);
return;
}

public finish(): Promise<string> {
return new Promise<string>(resolve => {
// Make a copy of the events array reference and immediately clear the
// events member so that we do not lose new events while uploading
// attachment.
const eventsRet = this._events;
this._events = [];
resolve(JSON.stringify(eventsRet));
});
}
}
import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest, WorkerResponse } from '../types';

/**
* Event buffer that uses a web worker to compress events.
Expand All @@ -164,6 +18,7 @@ export class EventBufferCompressionWorker implements EventBuffer {
private _worker: Worker;
private _eventBufferItemLength: number = 0;
private _id: number = 0;
private _ensureReadyPromise?: Promise<void>;

public constructor(worker: Worker) {
this._worker = worker;
Expand All @@ -190,11 +45,16 @@ export class EventBufferCompressionWorker implements EventBuffer {
* This will either resolve when the worker is ready, or reject if an error occured.
*/
public ensureReady(): Promise<void> {
return new Promise((resolve, reject) => {
// Ensure we only check once
if (this._ensureReadyPromise) {
return this._ensureReadyPromise;
}

this._ensureReadyPromise = new Promise((resolve, reject) => {
this._worker.addEventListener(
'message',
({ data }: MessageEvent) => {
if (data.success) {
if ((data as WorkerResponse).success) {
resolve();
} else {
reject();
Expand All @@ -211,6 +71,8 @@ export class EventBufferCompressionWorker implements EventBuffer {
{ once: true },
);
});

return this._ensureReadyPromise;
}

/**
Expand Down Expand Up @@ -248,39 +110,46 @@ export class EventBufferCompressionWorker implements EventBuffer {
/**
* Finish the event buffer and return the compressed data.
*/
public finish(): Promise<Uint8Array> {
return this._finishRequest(this._getAndIncrementId());
public async finish(): Promise<ReplayRecordingData> {
try {
return await this._finishRequest(this._getAndIncrementId());
} catch (error) {
__DEBUG_BUILD__ && logger.error('[Replay] Error when trying to compress events', error);
// fall back to uncompressed
const events = this.pendingEvents;
return JSON.stringify(events);
}
}

/**
* Post message to worker and wait for response before resolving promise.
*/
private _postMessage<T>({ id, method, args }: WorkerRequest): Promise<T> {
return new Promise((resolve, reject) => {
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
const listener = ({ data }: MessageEvent) => {
if (data.method !== method) {
const listener = ({ data }: MessageEvent): void => {
const response = data as WorkerResponse;
if (response.method !== method) {
return;
}

// There can be multiple listeners for a single method, the id ensures
// that the response matches the caller.
if (data.id !== id) {
if (response.id !== id) {
return;
}

// At this point, we'll always want to remove listener regardless of result status
this._worker.removeEventListener('message', listener);

if (!data.success) {
if (!response.success) {
// TODO: Do some error handling, not sure what
__DEBUG_BUILD__ && logger.error('[Replay]', data.response);
__DEBUG_BUILD__ && logger.error('[Replay]', response.response);

reject(new Error('Error in compression worker'));
return;
}

resolve(data.response);
resolve(response.response as T);
};

let stringifiedArgs;
Expand Down
Loading