Skip to content

Commit 47bdb74

Browse files
committed
feat(replay): Keep min. 30 seconds of data in error mode
1 parent a90ba73 commit 47bdb74

21 files changed

+583
-74
lines changed

packages/replay/src/constants.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ export const MASK_ALL_TEXT_SELECTOR = 'body *:not(style), body *:not(script)';
2727
export const DEFAULT_FLUSH_MIN_DELAY = 5_000;
2828
export const DEFAULT_FLUSH_MAX_DELAY = 5_000;
2929

30-
/* How long to wait for error checkouts */
31-
export const ERROR_CHECKOUT_TIME = 60_000;
30+
/* How often to capture a full checkout when in error mode */
31+
export const ERROR_CHECKOUT_TIME = 30_000;
3232

3333
export const RETRY_BASE_INTERVAL = 5000;
3434
export const RETRY_MAX_COUNT = 3;
Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,59 @@
1+
import type { ReplayRecordingData } from '@sentry/types';
2+
13
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
4+
import { PartitionedQueue } from './PartitionedQueue';
25

36
/**
47
* A basic event buffer that does not do any compression.
58
* Used as fallback if the compression worker cannot be loaded or is disabled.
69
*/
710
export class EventBufferArray implements EventBuffer {
811
/** All the events that are buffered to be sent. */
9-
public events: RecordingEvent[];
12+
protected _events: PartitionedQueue<RecordingEvent>;
1013

1114
public constructor() {
12-
this.events = [];
15+
this._events = new PartitionedQueue<RecordingEvent>();
16+
}
17+
18+
/** @inheritdoc */
19+
public get events(): RecordingEvent[] {
20+
return this._events.getItems();
1321
}
1422

1523
/** @inheritdoc */
1624
public get hasEvents(): boolean {
17-
return this.events.length > 0;
25+
return this._events.getLength() > 0;
1826
}
1927

2028
/** @inheritdoc */
2129
public destroy(): void {
22-
this.events = [];
30+
this._events.clear();
2331
}
2432

2533
/** @inheritdoc */
26-
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
27-
if (isCheckout) {
28-
this.events = [event];
29-
return;
30-
}
34+
public async clear(keepLastCheckout?: boolean): Promise<void> {
35+
this._events.clear(keepLastCheckout);
36+
}
3137

32-
this.events.push(event);
33-
return;
38+
/** @inheritdoc */
39+
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
40+
this._events.add(event, isCheckout);
3441
}
3542

3643
/** @inheritdoc */
37-
public finish(): Promise<string> {
44+
public finish(): Promise<ReplayRecordingData> {
3845
return new Promise<string>(resolve => {
3946
// Make a copy of the events array reference and immediately clear the
4047
// events member so that we do not lose new events while uploading
4148
// attachment.
4249
const eventsRet = this.events;
43-
this.events = [];
50+
this._events.clear();
4451
resolve(JSON.stringify(eventsRet));
4552
});
4653
}
54+
55+
/** @inheritdoc */
56+
public getEarliestTimestamp(): number | null {
57+
return this.events.map(event => event.timestamp).sort()[0] || null;
58+
}
4759
}

packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ export class EventBufferCompressionWorker implements EventBuffer {
5757
return this._finishRequest();
5858
}
5959

60+
/** @inheritdoc */
61+
public clear(): Promise<void> {
62+
// NOTE: We do not support keeping the last checkout in this implementation.
63+
return this._clear();
64+
}
65+
66+
/** @inheritdoc */
67+
public getEarliestTimestamp(): number | null {
68+
// Not supported in this mode
69+
return null;
70+
}
71+
6072
/**
6173
* Send the event to the worker.
6274
*/
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import type { ReplayRecordingData } from '@sentry/types';
2+
3+
import type { RecordingEvent } from '../types';
4+
import { EventBufferArray } from './EventBufferArray';
5+
import { WorkerHandler } from './WorkerHandler';
6+
7+
/**
8+
* Event buffer that uses a web worker to compress events.
9+
* Exported only for testing.
10+
*/
11+
export class EventBufferPartitionedCompressionWorker extends EventBufferArray {
12+
private _worker: WorkerHandler;
13+
14+
public constructor(worker: Worker) {
15+
super();
16+
this._worker = new WorkerHandler(worker);
17+
}
18+
/**
19+
* Ensure the worker is ready (or not).
20+
* This will either resolve when the worker is ready, or reject if an error occured.
21+
*/
22+
public ensureReady(): Promise<void> {
23+
return this._worker.ensureReady();
24+
}
25+
26+
/** @inheritdoc */
27+
public destroy(): void {
28+
this._worker.destroy();
29+
super.destroy();
30+
}
31+
32+
/**
33+
* Finish the event buffer and return the compressed data.
34+
*/
35+
public finish(): Promise<ReplayRecordingData> {
36+
const { events } = this;
37+
this._events.clear();
38+
39+
return this._compressEvents(events);
40+
}
41+
42+
/** Compress a given array of events at once. */
43+
private _compressEvents(events: RecordingEvent[]): Promise<Uint8Array> {
44+
return this._worker.postMessage<Uint8Array>('compress', JSON.stringify(events));
45+
}
46+
}

packages/replay/src/eventBuffer/EventBufferProxy.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import type { ReplayRecordingData } from '@sentry/types';
1+
import type { ReplayRecordingData, ReplayRecordingMode } from '@sentry/types';
22
import { logger } from '@sentry/utils';
33

44
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
55
import { EventBufferArray } from './EventBufferArray';
66
import { EventBufferCompressionWorker } from './EventBufferCompressionWorker';
7+
import { EventBufferPartitionedCompressionWorker } from './EventBufferPartitionedCompressionWorker';
78

89
/**
910
* This proxy will try to use the compression worker, and fall back to use the simple buffer if an error occurs there.
@@ -12,15 +13,22 @@ import { EventBufferCompressionWorker } from './EventBufferCompressionWorker';
1213
*/
1314
export class EventBufferProxy implements EventBuffer {
1415
private _fallback: EventBufferArray;
15-
private _compression: EventBufferCompressionWorker;
16+
private _compression: EventBufferCompressionWorker | EventBufferPartitionedCompressionWorker;
1617
private _used: EventBuffer;
1718
private _ensureWorkerIsLoadedPromise: Promise<void>;
1819

19-
public constructor(worker: Worker) {
20+
public constructor(worker: Worker, recordingMode: ReplayRecordingMode) {
2021
this._fallback = new EventBufferArray();
21-
this._compression = new EventBufferCompressionWorker(worker);
22-
this._used = this._fallback;
2322

23+
// In error mode, we use the partitioned compression worker, which does not use compression streaming
24+
// Instead, all events are sent at finish-time, as we need to continuously modify the queued events
25+
// In session mode, we use a streaming compression implementation, which is more performant
26+
this._compression =
27+
recordingMode === 'error'
28+
? new EventBufferPartitionedCompressionWorker(worker)
29+
: new EventBufferCompressionWorker(worker);
30+
31+
this._used = this._fallback;
2432
this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded();
2533
}
2634

@@ -52,11 +60,21 @@ export class EventBufferProxy implements EventBuffer {
5260
return this._used.finish();
5361
}
5462

63+
/** @inheritdoc */
64+
public clear(keepLastCheckout?: boolean): Promise<void> {
65+
return this._used.clear(keepLastCheckout);
66+
}
67+
5568
/** Ensure the worker has loaded. */
5669
public ensureWorkerIsLoaded(): Promise<void> {
5770
return this._ensureWorkerIsLoadedPromise;
5871
}
5972

73+
/** @inheritdoc */
74+
public getEarliestTimestamp(): number | null {
75+
return this._used.getEarliestTimestamp();
76+
}
77+
6078
/** Actually check if the worker has been loaded. */
6179
private async _ensureWorkerIsLoaded(): Promise<void> {
6280
try {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* A queue with partitions for each checkout.
3+
*/
4+
export class PartitionedQueue<T> {
5+
private _items: T[];
6+
private _lastCheckoutPos?: number;
7+
8+
public constructor() {
9+
this._items = [];
10+
}
11+
12+
/** Add an item to the queue. */
13+
public add(record: T, isCheckout?: boolean): void {
14+
this._items.push(record);
15+
16+
if (isCheckout) {
17+
this._lastCheckoutPos = this._items.length - 1;
18+
}
19+
}
20+
21+
/**
22+
* Clear items from the queue.
23+
* If `keepLastCheckout` is given, all items after the last checkout will be kept.
24+
*/
25+
public clear(keepLastCheckout?: boolean): void {
26+
if (!keepLastCheckout) {
27+
this._items = [];
28+
this._lastCheckoutPos = undefined;
29+
return;
30+
}
31+
32+
if (this._lastCheckoutPos) {
33+
this._items = this._items.splice(this._lastCheckoutPos);
34+
this._lastCheckoutPos = undefined;
35+
}
36+
37+
// Else, there is only a single checkout recorded yet, which we don't want to clear out
38+
}
39+
40+
/** Get all items */
41+
public getItems(): T[] {
42+
return this._items;
43+
}
44+
45+
/** Get the number of items that are queued. */
46+
public getLength(): number {
47+
return this._items.length;
48+
}
49+
}

packages/replay/src/eventBuffer/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { ReplayRecordingMode } from '@sentry/types';
12
import { logger } from '@sentry/utils';
23

34
import type { EventBuffer } from '../types';
@@ -7,12 +8,13 @@ import { EventBufferProxy } from './EventBufferProxy';
78

89
interface CreateEventBufferParams {
910
useCompression: boolean;
11+
recordingMode: ReplayRecordingMode;
1012
}
1113

1214
/**
1315
* Create an event buffer for replays.
1416
*/
15-
export function createEventBuffer({ useCompression }: CreateEventBufferParams): EventBuffer {
17+
export function createEventBuffer({ useCompression, recordingMode }: CreateEventBufferParams): EventBuffer {
1618
// eslint-disable-next-line no-restricted-globals
1719
if (useCompression && window.Worker) {
1820
try {
@@ -21,7 +23,7 @@ export function createEventBuffer({ useCompression }: CreateEventBufferParams):
2123

2224
__DEBUG_BUILD__ && logger.log('[Replay] Using compression worker');
2325
const worker = new Worker(workerUrl);
24-
return new EventBufferProxy(worker);
26+
return new EventBufferProxy(worker, recordingMode);
2527
} catch (error) {
2628
__DEBUG_BUILD__ && logger.log('[Replay] Failed to create compression worker');
2729
// Fall back to use simple event buffer array

packages/replay/src/replay.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ export class ReplayContainer implements ReplayContainerInterface {
182182

183183
this.eventBuffer = createEventBuffer({
184184
useCompression: this._options.useCompression,
185+
recordingMode: this.recordingMode,
185186
});
186187

187188
this._addListeners();

packages/replay/src/types.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export interface SendReplayData {
2323
*/
2424
export interface WorkerRequest {
2525
id: number;
26-
method: 'clear' | 'addEvent' | 'finish';
26+
method: 'clear' | 'addEvent' | 'finish' | 'compress';
2727
arg?: string;
2828
}
2929

@@ -281,6 +281,12 @@ export interface EventBuffer {
281281
* Clears and returns the contents of the buffer.
282282
*/
283283
finish(): Promise<ReplayRecordingData>;
284+
285+
/** Clear the buffer, and optional ensure we keep the last checkout. */
286+
clear(keepLastCheckout?: boolean): Promise<void>;
287+
288+
/** Get the earliest timestamp of a pending event. */
289+
getEarliestTimestamp(): number | null;
284290
}
285291

286292
export type AddUpdateCallback = () => boolean | void;

packages/replay/src/util/addEvent.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ export async function addEvent(
1111
event: RecordingEvent,
1212
isCheckout?: boolean,
1313
): Promise<AddEventResult | null> {
14-
if (!replay.eventBuffer) {
14+
const { eventBuffer, session } = replay;
15+
16+
if (!eventBuffer || !session) {
1517
// This implies that `_isEnabled` is false
1618
return null;
1719
}
@@ -37,12 +39,27 @@ export async function addEvent(
3739
// Only record earliest event if a new session was created, otherwise it
3840
// shouldn't be relevant
3941
const earliestEvent = replay.getContext().earliestEvent;
40-
if (replay.session && replay.session.segmentId === 0 && (!earliestEvent || timestampInMs < earliestEvent)) {
42+
if (session.segmentId === 0 && (!earliestEvent || timestampInMs < earliestEvent)) {
4143
replay.getContext().earliestEvent = timestampInMs;
4244
}
4345

4446
try {
45-
return await replay.eventBuffer.addEvent(event, isCheckout);
47+
if (isCheckout) {
48+
if (replay.recordingMode === 'error') {
49+
// Do not wait on it, just do it
50+
// We know in this mode this is actually "sync"
51+
void eventBuffer.clear(true);
52+
53+
// Ensure we have the correct first checkout timestamp when an error occurs
54+
if (!session.segmentId) {
55+
replay.getContext().earliestEvent = eventBuffer.getEarliestTimestamp();
56+
}
57+
} else {
58+
await eventBuffer.clear();
59+
}
60+
}
61+
62+
return await eventBuffer.addEvent(event, isCheckout);
4663
} catch (error) {
4764
__DEBUG_BUILD__ && logger.error(error);
4865
replay.stop();

packages/replay/src/worker/worker.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)