Skip to content

feat(replay): Keep min. 30 seconds of data in error mode #7025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/replay/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ export const MAX_SESSION_LIFE = 3_600_000; // 60 minutes
export const DEFAULT_FLUSH_MIN_DELAY = 5_000;
export const DEFAULT_FLUSH_MAX_DELAY = 5_000;

/* How long to wait for error checkouts */
export const ERROR_CHECKOUT_TIME = 60_000;
/* How often to capture a full checkout when in error mode */
export const ERROR_CHECKOUT_TIME = 30_000;

export const RETRY_BASE_INTERVAL = 5000;
export const RETRY_MAX_COUNT = 3;
38 changes: 25 additions & 13 deletions packages/replay/src/eventBuffer/EventBufferArray.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,59 @@
import type { ReplayRecordingData } from '@sentry/types';

import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
import { PartitionedQueue } from './PartitionedQueue';

/**
* A basic event buffer that does not do any compression.
* Used as fallback if the compression worker cannot be loaded or is disabled.
*/
export class EventBufferArray implements EventBuffer {
/** All the events that are buffered to be sent. */
public events: RecordingEvent[];
protected _events: PartitionedQueue<RecordingEvent>;

public constructor() {
this.events = [];
this._events = new PartitionedQueue<RecordingEvent>();
}

/** @inheritdoc */
public get events(): RecordingEvent[] {
return this._events.getItems();
}

/** @inheritdoc */
public get hasEvents(): boolean {
return this.events.length > 0;
return this._events.getLength() > 0;
}

/** @inheritdoc */
public destroy(): void {
this.events = [];
this._events.clear();
}

/** @inheritdoc */
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
if (isCheckout) {
this.events = [event];
return;
}
public async clear(keepLastCheckout?: boolean): Promise<void> {
this._events.clear(keepLastCheckout);
}

this.events.push(event);
return;
/** @inheritdoc */
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
this._events.add(event, isCheckout);
}

/** @inheritdoc */
public finish(): Promise<string> {
public finish(): Promise<ReplayRecordingData> {
return new Promise<string>(resolve => {
// Make a copy of the events array reference and immediately clear the
// events member so that we do not lose new events while uploading
// attachment.
const eventsRet = this.events;
this.events = [];
this._events.clear();
resolve(JSON.stringify(eventsRet));
});
}

/** @inheritdoc */
public getEarliestTimestamp(): number | null {
return this.events.map(event => event.timestamp).sort()[0] || null;
}
}
12 changes: 12 additions & 0 deletions packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ export class EventBufferCompressionWorker implements EventBuffer {
return this._finishRequest();
}

/** @inheritdoc */
public clear(): Promise<void> {
// NOTE: We do not support keeping the last checkout in this implementation.
return this._clear();
}

/** @inheritdoc */
public getEarliestTimestamp(): number | null {
// Not supported in this mode
return null;
}

/**
* Send the event to the worker.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import type { ReplayRecordingData } from '@sentry/types';

import type { RecordingEvent } from '../types';
import { EventBufferArray } from './EventBufferArray';
import { WorkerHandler } from './WorkerHandler';

/**
* Event buffer that uses a web worker to compress events.
* Exported only for testing.
*/
export class EventBufferPartitionedCompressionWorker extends EventBufferArray {
private _worker: WorkerHandler;

public constructor(worker: Worker) {
super();
this._worker = new WorkerHandler(worker);
}
/**
* Ensure the worker is ready (or not).
* This will either resolve when the worker is ready, or reject if an error occured.
*/
public ensureReady(): Promise<void> {
return this._worker.ensureReady();
}

/** @inheritdoc */
public destroy(): void {
this._worker.destroy();
super.destroy();
}

/**
* Finish the event buffer and return the compressed data.
*/
public finish(): Promise<ReplayRecordingData> {
const { events } = this;
this._events.clear();

return this._compressEvents(events);
}

/** Compress a given array of events at once. */
private _compressEvents(events: RecordingEvent[]): Promise<Uint8Array> {
return this._worker.postMessage<Uint8Array>('compress', JSON.stringify(events));
}
}
25 changes: 21 additions & 4 deletions packages/replay/src/eventBuffer/EventBufferProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { logger } from '@sentry/utils';
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
import { EventBufferArray } from './EventBufferArray';
import { EventBufferCompressionWorker } from './EventBufferCompressionWorker';
import { EventBufferPartitionedCompressionWorker } from './EventBufferPartitionedCompressionWorker';

/**
* This proxy will try to use the compression worker, and fall back to use the simple buffer if an error occurs there.
Expand All @@ -12,15 +13,21 @@ import { EventBufferCompressionWorker } from './EventBufferCompressionWorker';
*/
export class EventBufferProxy implements EventBuffer {
private _fallback: EventBufferArray;
private _compression: EventBufferCompressionWorker;
private _compression: EventBufferCompressionWorker | EventBufferPartitionedCompressionWorker;
private _used: EventBuffer;
private _ensureWorkerIsLoadedPromise: Promise<void>;

public constructor(worker: Worker) {
public constructor(worker: Worker, keepLastCheckout: boolean) {
this._fallback = new EventBufferArray();
this._compression = new EventBufferCompressionWorker(worker);
this._used = this._fallback;

// In error mode, we use the partitioned compression worker, which does not use compression streaming
// Instead, all events are sent at finish-time, as we need to continuously modify the queued events
// In session mode, we use a streaming compression implementation, which is more performant
this._compression = keepLastCheckout
? new EventBufferPartitionedCompressionWorker(worker)
: new EventBufferCompressionWorker(worker);

this._used = this._fallback;
this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded();
}

Expand Down Expand Up @@ -52,11 +59,21 @@ export class EventBufferProxy implements EventBuffer {
return this._used.finish();
}

/** @inheritdoc */
public clear(keepLastCheckout?: boolean): Promise<void> {
return this._used.clear(keepLastCheckout);
}

/** Ensure the worker has loaded. */
public ensureWorkerIsLoaded(): Promise<void> {
return this._ensureWorkerIsLoadedPromise;
}

/** @inheritdoc */
public getEarliestTimestamp(): number | null {
return this._used.getEarliestTimestamp();
}

/** Actually check if the worker has been loaded. */
private async _ensureWorkerIsLoaded(): Promise<void> {
try {
Expand Down
49 changes: 49 additions & 0 deletions packages/replay/src/eventBuffer/PartitionedQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* A queue with partitions for each checkout.
*/
export class PartitionedQueue<T> {
private _items: T[];
private _lastCheckoutPos?: number;

public constructor() {
this._items = [];
}

/** Add an item to the queue. */
public add(record: T, isCheckout?: boolean): void {
this._items.push(record);

if (isCheckout) {
this._lastCheckoutPos = this._items.length - 1;
}
}

/**
* Clear items from the queue.
* If `keepLastCheckout` is given, all items after the last checkout will be kept.
*/
public clear(keepLastCheckout?: boolean): void {
if (!keepLastCheckout) {
this._items = [];
this._lastCheckoutPos = undefined;
return;
}

if (this._lastCheckoutPos) {
this._items = this._items.splice(this._lastCheckoutPos);
this._lastCheckoutPos = undefined;
}

// Else, there is only a single checkout recorded yet, which we don't want to clear out
}

/** Get all items */
public getItems(): T[] {
return this._items;
}

/** Get the number of items that are queued. */
public getLength(): number {
return this._items.length;
}
}
5 changes: 3 additions & 2 deletions packages/replay/src/eventBuffer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ import { EventBufferProxy } from './EventBufferProxy';

interface CreateEventBufferParams {
useCompression: boolean;
keepLastCheckout: boolean;
}

/**
* Create an event buffer for replays.
*/
export function createEventBuffer({ useCompression }: CreateEventBufferParams): EventBuffer {
export function createEventBuffer({ useCompression, keepLastCheckout }: CreateEventBufferParams): EventBuffer {
// eslint-disable-next-line no-restricted-globals
if (useCompression && window.Worker) {
try {
const workerUrl = getWorkerURL();

__DEBUG_BUILD__ && logger.log('[Replay] Using compression worker');
const worker = new Worker(workerUrl);
return new EventBufferProxy(worker);
return new EventBufferProxy(worker, keepLastCheckout);
} catch (error) {
__DEBUG_BUILD__ && logger.log('[Replay] Failed to create compression worker');
// Fall back to use simple event buffer array
Expand Down
1 change: 1 addition & 0 deletions packages/replay/src/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ export class ReplayContainer implements ReplayContainerInterface {

this.eventBuffer = createEventBuffer({
useCompression: this._options.useCompression,
keepLastCheckout: this.recordingMode === 'error',
});

this._addListeners();
Expand Down
8 changes: 7 additions & 1 deletion packages/replay/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export interface SendReplayData {
*/
export interface WorkerRequest {
id: number;
method: 'clear' | 'addEvent' | 'finish';
method: 'clear' | 'addEvent' | 'finish' | 'compress';
arg?: string;
}

Expand Down Expand Up @@ -280,6 +280,12 @@ export interface EventBuffer {
* Clears and returns the contents of the buffer.
*/
finish(): Promise<ReplayRecordingData>;

/** Clear the buffer, and optional ensure we keep the last checkout. */
clear(keepLastCheckout?: boolean): Promise<void>;

/** Get the earliest timestamp of a pending event. */
getEarliestTimestamp(): number | null;
}

export type AddUpdateCallback = () => boolean | void;
Expand Down
23 changes: 20 additions & 3 deletions packages/replay/src/util/addEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ export async function addEvent(
event: RecordingEvent,
isCheckout?: boolean,
): Promise<AddEventResult | null> {
if (!replay.eventBuffer) {
const { eventBuffer, session } = replay;

if (!eventBuffer || !session) {
// This implies that `_isEnabled` is false
return null;
}
Expand All @@ -38,12 +40,27 @@ export async function addEvent(
// Only record earliest event if a new session was created, otherwise it
// shouldn't be relevant
const earliestEvent = replay.getContext().earliestEvent;
if (replay.session && replay.session.segmentId === 0 && (!earliestEvent || timestampInMs < earliestEvent)) {
if (session.segmentId === 0 && (!earliestEvent || timestampInMs < earliestEvent)) {
replay.getContext().earliestEvent = timestampInMs;
}

try {
return await replay.eventBuffer.addEvent(event, isCheckout);
if (isCheckout) {
if (replay.recordingMode === 'error') {
// Do not wait on it, just do it
// We know in this mode this is actually "sync"
void eventBuffer.clear(true);

// Ensure we have the correct first checkout timestamp when an error occurs
if (!session.segmentId) {
replay.getContext().earliestEvent = eventBuffer.getEarliestTimestamp();
}
Comment on lines +49 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify for myself, we examine the buffer for the earliest event because we want the replay "start time" to be when the 1st checkout in the buffer happens.

So if session has been running for 2.5 minutes, we would want the replay to "start" at ~2 minutes. And replay.context.earliestEvent (before this block of code), would represent the earliest event of the entire "session".

So an issue I can think of is with the performance observer. I believe we only add them when we flush, so those events will end up becoming the earliest events anyway (which might be fine).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, to be honest all of this is a bit brittle to me 😅 but this was the best I could come up with to handle this in a somewhat reasonable way. I wonder if it makes sense to maybe compute this on the server? Or generally change how we keep this a bit, not sure...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

semi-related to this PR but I was also wondering about that - do we discard events with timestamps before the initial full checkout on the server? Or would sending such events cause problems?

Copy link
Member

@billyvg billyvg Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discard events that are older than SESSION_IDLE_TIMEOUT minutes ago. Because we get buffered data from performance observer, those occur before initial checkout.

I wonder if we need to change/move this logic as well

// See note above re: session start needs to reflect the most recent
// checkout.
if (this.recordingMode === 'error' && this.session && this._context.earliestEvent) {
this.session.started = this._context.earliestEvent;
this._maybeSaveSession();
}

I think this is to meant to update session start time so we know when to correctly timeout a session.

} else {
await eventBuffer.clear();
}
}

return await eventBuffer.addEvent(event, isCheckout);
} catch (error) {
__DEBUG_BUILD__ && logger.error(error);
replay.stop();
Expand Down
Loading