Skip to content

Commit 7ba8c7b

Browse files
committed
feat(replay): Keep min. 30 seconds of data in error mode
1 parent f993317 commit 7ba8c7b

18 files changed

+553
-67
lines changed

packages/replay/src/constants.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ export const MAX_SESSION_LIFE = 3_600_000; // 60 minutes
2424
export const DEFAULT_FLUSH_MIN_DELAY = 5_000;
2525
export const DEFAULT_FLUSH_MAX_DELAY = 5_000;
2626

27-
/* How long to wait for error checkouts */
28-
export const ERROR_CHECKOUT_TIME = 60_000;
27+
/* How often to capture a full checkout when in error mode */
28+
export const ERROR_CHECKOUT_TIME = 30_000;
2929

3030
export const RETRY_BASE_INTERVAL = 5000;
3131
export const RETRY_MAX_COUNT = 3;
Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,59 @@
1+
import type { ReplayRecordingData } from '@sentry/types';
2+
13
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
4+
import { PartitionedQueue } from './PartitionedQueue';
25

36
/**
47
* A basic event buffer that does not do any compression.
58
* Used as fallback if the compression worker cannot be loaded or is disabled.
69
*/
710
export class EventBufferArray implements EventBuffer {
811
/** All the events that are buffered to be sent. */
9-
public events: RecordingEvent[];
12+
protected _events: PartitionedQueue<RecordingEvent>;
1013

1114
public constructor() {
12-
this.events = [];
15+
this._events = new PartitionedQueue<RecordingEvent>();
16+
}
17+
18+
/** @inheritdoc */
19+
public get events(): RecordingEvent[] {
20+
return this._events.getItems();
1321
}
1422

1523
/** @inheritdoc */
1624
public get hasEvents(): boolean {
17-
return this.events.length > 0;
25+
return this._events.getLength() > 0;
1826
}
1927

2028
/** @inheritdoc */
2129
public destroy(): void {
22-
this.events = [];
30+
this._events.clear();
2331
}
2432

2533
/** @inheritdoc */
26-
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
27-
if (isCheckout) {
28-
this.events = [event];
29-
return;
30-
}
34+
public async clear(keepLastCheckout?: boolean): Promise<void> {
35+
this._events.clear(keepLastCheckout);
36+
}
3137

32-
this.events.push(event);
33-
return;
38+
/** @inheritdoc */
39+
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
40+
this._events.add(event, isCheckout);
3441
}
3542

3643
/** @inheritdoc */
37-
public finish(): Promise<string> {
44+
public finish(): Promise<ReplayRecordingData> {
3845
return new Promise<string>(resolve => {
3946
// Make a copy of the events array reference and immediately clear the
4047
// events member so that we do not lose new events while uploading
4148
// attachment.
4249
const eventsRet = this.events;
43-
this.events = [];
50+
this._events.clear();
4451
resolve(JSON.stringify(eventsRet));
4552
});
4653
}
54+
55+
/** @inheritdoc */
56+
public getEarliestTimestamp(): number | null {
57+
return this.events.map(event => event.timestamp).sort()[0] || null;
58+
}
4759
}

packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ export class EventBufferCompressionWorker implements EventBuffer {
5757
return this._finishRequest();
5858
}
5959

60+
/** @inheritdoc */
61+
public clear(): Promise<void> {
62+
// NOTE: We do not support keeping the last checkout in this implementation.
63+
return this._clear();
64+
}
65+
66+
/** @inheritdoc */
67+
public getEarliestTimestamp(): number | null {
68+
// Not supported in this mode
69+
return null;
70+
}
71+
6072
/**
6173
* Send the event to the worker.
6274
*/
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import type { ReplayRecordingData } from '@sentry/types';
2+
3+
import type { RecordingEvent } from '../types';
4+
import { EventBufferArray } from './EventBufferArray';
5+
import { WorkerHandler } from './WorkerHandler';
6+
7+
/**
8+
* Event buffer that uses a web worker to compress events.
9+
* Exported only for testing.
10+
*/
11+
export class EventBufferPartitionedCompressionWorker extends EventBufferArray {
12+
private _worker: WorkerHandler;
13+
14+
public constructor(worker: Worker) {
15+
super();
16+
this._worker = new WorkerHandler(worker);
17+
}
18+
/**
19+
* Ensure the worker is ready (or not).
20+
* This will either resolve when the worker is ready, or reject if an error occured.
21+
*/
22+
public ensureReady(): Promise<void> {
23+
return this._worker.ensureReady();
24+
}
25+
26+
/** @inheritdoc */
27+
public destroy(): void {
28+
this._worker.destroy();
29+
super.destroy();
30+
}
31+
32+
/**
33+
* Finish the event buffer and return the compressed data.
34+
*/
35+
public finish(): Promise<ReplayRecordingData> {
36+
const { events } = this;
37+
this._events.clear();
38+
39+
return this._compressEvents(events);
40+
}
41+
42+
/** Compress a given array of events at once. */
43+
private _compressEvents(events: RecordingEvent[]): Promise<Uint8Array> {
44+
return this._worker.postMessage<Uint8Array>('compress', JSON.stringify(events));
45+
}
46+
}

packages/replay/src/eventBuffer/EventBufferProxy.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { logger } from '@sentry/utils';
44
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
55
import { EventBufferArray } from './EventBufferArray';
66
import { EventBufferCompressionWorker } from './EventBufferCompressionWorker';
7+
import { EventBufferPartitionedCompressionWorker } from './EventBufferPartitionedCompressionWorker';
78

89
/**
910
* This proxy will try to use the compression worker, and fall back to use the simple buffer if an error occurs there.
@@ -12,15 +13,21 @@ import { EventBufferCompressionWorker } from './EventBufferCompressionWorker';
1213
*/
1314
export class EventBufferProxy implements EventBuffer {
1415
private _fallback: EventBufferArray;
15-
private _compression: EventBufferCompressionWorker;
16+
private _compression: EventBufferCompressionWorker | EventBufferPartitionedCompressionWorker;
1617
private _used: EventBuffer;
1718
private _ensureWorkerIsLoadedPromise: Promise<void>;
1819

19-
public constructor(worker: Worker) {
20+
public constructor(worker: Worker, keepLastCheckout: boolean) {
2021
this._fallback = new EventBufferArray();
21-
this._compression = new EventBufferCompressionWorker(worker);
22-
this._used = this._fallback;
2322

23+
// In error mode, we use the partitioned compression worker, which does not use compression streaming
24+
// Instead, all events are sent at finish-time, as we need to continuously modify the queued events
25+
// In session mode, we use a streaming compression implementation, which is more performant
26+
this._compression = keepLastCheckout
27+
? new EventBufferPartitionedCompressionWorker(worker)
28+
: new EventBufferCompressionWorker(worker);
29+
30+
this._used = this._fallback;
2431
this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded();
2532
}
2633

@@ -52,11 +59,21 @@ export class EventBufferProxy implements EventBuffer {
5259
return this._used.finish();
5360
}
5461

62+
/** @inheritdoc */
63+
public clear(keepLastCheckout?: boolean): Promise<void> {
64+
return this._used.clear(keepLastCheckout);
65+
}
66+
5567
/** Ensure the worker has loaded. */
5668
public ensureWorkerIsLoaded(): Promise<void> {
5769
return this._ensureWorkerIsLoadedPromise;
5870
}
5971

72+
/** @inheritdoc */
73+
public getEarliestTimestamp(): number | null {
74+
return this._used.getEarliestTimestamp();
75+
}
76+
6077
/** Actually check if the worker has been loaded. */
6178
private async _ensureWorkerIsLoaded(): Promise<void> {
6279
try {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* A queue with partitions for each checkout.
3+
*/
4+
export class PartitionedQueue<T> {
5+
private _items: T[];
6+
private _lastCheckoutPos?: number;
7+
8+
public constructor() {
9+
this._items = [];
10+
}
11+
12+
/** Add an item to the queue. */
13+
public add(record: T, isCheckout?: boolean): void {
14+
this._items.push(record);
15+
16+
if (isCheckout) {
17+
this._lastCheckoutPos = this._items.length - 1;
18+
}
19+
}
20+
21+
/**
22+
* Clear items from the queue.
23+
* If `keepLastCheckout` is given, all items after the last checkout will be kept.
24+
*/
25+
public clear(keepLastCheckout?: boolean): void {
26+
if (!keepLastCheckout) {
27+
this._items = [];
28+
this._lastCheckoutPos = undefined;
29+
return;
30+
}
31+
32+
if (this._lastCheckoutPos) {
33+
this._items = this._items.splice(this._lastCheckoutPos);
34+
this._lastCheckoutPos = undefined;
35+
}
36+
37+
// Else, there is only a single checkout recorded yet, which we don't want to clear out
38+
}
39+
40+
/** Get all items */
41+
public getItems(): T[] {
42+
return this._items;
43+
}
44+
45+
/** Get the number of items that are queued. */
46+
public getLength(): number {
47+
return this._items.length;
48+
}
49+
}

packages/replay/src/eventBuffer/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,21 @@ import { EventBufferProxy } from './EventBufferProxy';
77

88
interface CreateEventBufferParams {
99
useCompression: boolean;
10+
keepLastCheckout: boolean;
1011
}
1112

1213
/**
1314
* Create an event buffer for replays.
1415
*/
15-
export function createEventBuffer({ useCompression }: CreateEventBufferParams): EventBuffer {
16+
export function createEventBuffer({ useCompression, keepLastCheckout }: CreateEventBufferParams): EventBuffer {
1617
// eslint-disable-next-line no-restricted-globals
1718
if (useCompression && window.Worker) {
1819
try {
1920
const workerUrl = getWorkerURL();
2021

2122
__DEBUG_BUILD__ && logger.log('[Replay] Using compression worker');
2223
const worker = new Worker(workerUrl);
23-
return new EventBufferProxy(worker);
24+
return new EventBufferProxy(worker, keepLastCheckout);
2425
} catch (error) {
2526
__DEBUG_BUILD__ && logger.log('[Replay] Failed to create compression worker');
2627
// Fall back to use simple event buffer array

packages/replay/src/replay.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ export class ReplayContainer implements ReplayContainerInterface {
180180

181181
this.eventBuffer = createEventBuffer({
182182
useCompression: this._options.useCompression,
183+
keepLastCheckout: this.recordingMode === 'error',
183184
});
184185

185186
this._addListeners();

packages/replay/src/types.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export interface SendReplayData {
2323
*/
2424
export interface WorkerRequest {
2525
id: number;
26-
method: 'clear' | 'addEvent' | 'finish';
26+
method: 'clear' | 'addEvent' | 'finish' | 'compress';
2727
arg?: string;
2828
}
2929

@@ -280,6 +280,12 @@ export interface EventBuffer {
280280
* Clears and returns the contents of the buffer.
281281
*/
282282
finish(): Promise<ReplayRecordingData>;
283+
284+
/** Clear the buffer, and optional ensure we keep the last checkout. */
285+
clear(keepLastCheckout?: boolean): Promise<void>;
286+
287+
/** Get the earliest timestamp of a pending event. */
288+
getEarliestTimestamp(): number | null;
283289
}
284290

285291
export type AddUpdateCallback = () => boolean | void;

packages/replay/src/util/addEvent.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ export async function addEvent(
1212
event: RecordingEvent,
1313
isCheckout?: boolean,
1414
): Promise<AddEventResult | null> {
15-
if (!replay.eventBuffer) {
15+
const { eventBuffer, session } = replay;
16+
17+
if (!eventBuffer || !session) {
1618
// This implies that `_isEnabled` is false
1719
return null;
1820
}
@@ -38,12 +40,27 @@ export async function addEvent(
3840
// Only record earliest event if a new session was created, otherwise it
3941
// shouldn't be relevant
4042
const earliestEvent = replay.getContext().earliestEvent;
41-
if (replay.session && replay.session.segmentId === 0 && (!earliestEvent || timestampInMs < earliestEvent)) {
43+
if (session.segmentId === 0 && (!earliestEvent || timestampInMs < earliestEvent)) {
4244
replay.getContext().earliestEvent = timestampInMs;
4345
}
4446

4547
try {
46-
return await replay.eventBuffer.addEvent(event, isCheckout);
48+
if (isCheckout) {
49+
if (replay.recordingMode === 'error') {
50+
// Do not wait on it, just do it
51+
// We know in this mode this is actually "sync"
52+
void eventBuffer.clear(true);
53+
54+
// Ensure we have the correct first checkout timestamp when an error occurs
55+
if (!session.segmentId) {
56+
replay.getContext().earliestEvent = eventBuffer.getEarliestTimestamp();
57+
}
58+
} else {
59+
await eventBuffer.clear();
60+
}
61+
}
62+
63+
return await eventBuffer.addEvent(event, isCheckout);
4764
} catch (error) {
4865
__DEBUG_BUILD__ && logger.error(error);
4966
replay.stop();

0 commit comments

Comments
 (0)