Skip to content

Commit cb6be34

Browse files
committed
ref: Use partitionedqueue for event buffer
1 parent b6c02e6 commit cb6be34

File tree

7 files changed

+130
-53
lines changed

7 files changed

+130
-53
lines changed

packages/replay/src/eventBuffer/EventBufferArray.ts

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,32 @@
11
import type { ReplayRecordingData } from '@sentry/types';
22

33
import type { EventBuffer, RecordingEvent } from '../types';
4-
5-
interface EventsGroup {
6-
checkoutTimestamp: number;
7-
events: RecordingEvent[];
8-
}
4+
import { PartitionedQueue } from './PartitionedQueue';
95

106
/**
117
* A basic event buffer that does not do any compression.
128
* Used as fallback if the compression worker cannot be loaded or is disabled.
139
*/
1410
export class EventBufferArray implements EventBuffer {
15-
private _events: EventsGroup[];
16-
private _eventsFlat: RecordingEvent[];
11+
private _events: PartitionedQueue<RecordingEvent>;
1712

1813
public constructor() {
19-
this._events = [];
20-
this._eventsFlat = [];
14+
this._events = new PartitionedQueue<RecordingEvent>();
2115
}
2216

2317
/** @inheritdoc */
2418
public get pendingLength(): number {
25-
return this.pendingEvents.length;
19+
return this._events.getLength();
2620
}
2721

2822
/** @inheritdoc */
2923
public get pendingEvents(): RecordingEvent[] {
30-
return this._eventsFlat;
31-
// return this._events.reduce((acc, { events }) => [...events, ...acc], [] as RecordingEvent[]);
24+
return this._events.getItems();
3225
}
3326

3427
/** @inheritdoc */
35-
public getFirstCheckoutTimestamp(): number | null {
36-
return (this._events[0] && this._events[0].checkoutTimestamp) || null;
28+
public getEarliestTimestamp(): number | null {
29+
return this.pendingEvents.map(event => event.timestamp).sort()[0] || null;
3730
}
3831

3932
/** @inheritdoc */
@@ -43,40 +36,17 @@ export class EventBufferArray implements EventBuffer {
4336

4437
/** @inheritdoc */
4538
public addEvent(event: RecordingEvent, isCheckout?: boolean): void {
46-
if (isCheckout || this._events.length === 0) {
47-
const group: EventsGroup = {
48-
checkoutTimestamp: event.timestamp,
49-
events: [event],
50-
};
51-
this._events.unshift(group);
52-
} else {
53-
this._events[0].events.push(event);
54-
}
55-
56-
this._eventsFlat.push(event);
39+
this._events.add(event, isCheckout);
5740
}
5841

5942
/** @inheritdoc */
6043
public clear(keepLastCheckout?: boolean): void {
61-
if (keepLastCheckout) {
62-
this._events.splice(1);
63-
64-
if (this._events.length === 0) {
65-
this._eventsFlat = [];
66-
} else {
67-
// Remove all events from the flat array that are not in the first group
68-
const firstGroup = this._events[0];
69-
this._eventsFlat = this._eventsFlat.filter(event => firstGroup.events.includes(event));
70-
}
71-
} else {
72-
this._events = [];
73-
this._eventsFlat = [];
74-
}
44+
this._events.clear(keepLastCheckout);
7545
}
7646

7747
/** @inheritdoc */
7848
public finish(): Promise<ReplayRecordingData> {
79-
const pendingEvents = this.pendingEvents.slice();
49+
const { pendingEvents } = this;
8050
this.clear();
8151

8252
return Promise.resolve(this._finishRecording(pendingEvents));

packages/replay/src/eventBuffer/EventBufferProxy.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ export class EventBufferProxy implements EventBuffer {
5959
}
6060

6161
/** @inheritdoc */
62-
public getFirstCheckoutTimestamp(): number | null {
63-
return this._used.getFirstCheckoutTimestamp();
62+
public getEarliestTimestamp(): number | null {
63+
return this._used.getEarliestTimestamp();
6464
}
6565

6666
/** Ensure the worker has loaded. */
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* A queue with partitions for each checkout.
3+
*/
4+
export class PartitionedQueue<T> {
5+
private _items: T[];
6+
private _lastCheckoutPos?: number;
7+
8+
public constructor() {
9+
this._items = [];
10+
}
11+
12+
/** Add an item to the queue. */
13+
public add(record: T, isCheckout?: boolean): void {
14+
this._items.push(record);
15+
16+
if (isCheckout) {
17+
this._lastCheckoutPos = this._items.length - 1;
18+
}
19+
}
20+
21+
/**
22+
* Clear items from the queue.
23+
* If `keepLastCheckout` is given, all items after the last checkout will be kept.
24+
*/
25+
public clear(keepLastCheckout?: boolean): void {
26+
if (!keepLastCheckout) {
27+
this._items = [];
28+
this._lastCheckoutPos = undefined;
29+
return;
30+
}
31+
32+
if (this._lastCheckoutPos) {
33+
this._items = this._items.splice(this._lastCheckoutPos);
34+
this._lastCheckoutPos = undefined;
35+
}
36+
37+
// Else, there is only a single checkout recorded yet, which we don't want to clear out
38+
}
39+
40+
/** Get all items */
41+
public getItems(): T[] {
42+
return this._items;
43+
}
44+
45+
/** Get the number of items that are queued. */
46+
public getLength(): number {
47+
return this._items.length;
48+
}
49+
}

packages/replay/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ export interface EventBuffer {
292292
finish(): Promise<ReplayRecordingData>;
293293

294294
/** Get the timestamp of the first checkout that is pending. */
295-
getFirstCheckoutTimestamp(): number | null;
295+
getEarliestTimestamp(): number | null;
296296
}
297297

298298
export type AddUpdateCallback = () => boolean | void;

packages/replay/src/util/addEvent.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export function addEvent(replay: ReplayContainer, event: RecordingEvent, isCheck
4343

4444
// Ensure we have the correct first checkout timestamp when an error occurs
4545
if (!session.segmentId) {
46-
replay.getContext().earliestEvent = eventBuffer.getFirstCheckoutTimestamp();
46+
replay.getContext().earliestEvent = eventBuffer.getEarliestTimestamp();
4747
}
4848
} else {
4949
eventBuffer.clear();
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { PartitionedQueue } from '../../../src/eventBuffer/PartitionedQueue';
2+
3+
describe('Unit | eventBuffer | PartitionedQueue', () => {
4+
it('works with empty queue', () => {
5+
const queue = new PartitionedQueue<string>();
6+
7+
expect(queue.getItems()).toEqual([]);
8+
expect(queue.getLength()).toEqual(0);
9+
10+
queue.clear();
11+
12+
expect(queue.getItems()).toEqual([]);
13+
expect(queue.getLength()).toEqual(0);
14+
15+
queue.clear(true);
16+
17+
expect(queue.getItems()).toEqual([]);
18+
expect(queue.getLength()).toEqual(0);
19+
});
20+
21+
it('allows to add records', () => {
22+
const queue = new PartitionedQueue<string>();
23+
24+
queue.add('one');
25+
queue.add('two');
26+
queue.add('three');
27+
28+
expect(queue.getItems()).toEqual(['one', 'two', 'three']);
29+
expect(queue.getLength()).toEqual(3);
30+
31+
queue.clear();
32+
33+
expect(queue.getItems()).toEqual([]);
34+
expect(queue.getLength()).toEqual(0);
35+
});
36+
37+
it('allows to add records with checkouts', () => {
38+
const queue = new PartitionedQueue<string>();
39+
40+
queue.add('one');
41+
queue.add('two');
42+
queue.add('three', true);
43+
queue.add('four');
44+
45+
expect(queue.getItems()).toEqual(['one', 'two', 'three', 'four']);
46+
expect(queue.getLength()).toEqual(4);
47+
48+
queue.clear(true);
49+
50+
expect(queue.getItems()).toEqual(['three', 'four']);
51+
expect(queue.getLength()).toEqual(2);
52+
53+
queue.clear(true);
54+
55+
expect(queue.getItems()).toEqual(['three', 'four']);
56+
expect(queue.getLength()).toEqual(2);
57+
});
58+
});

packages/replay/test/unit/util/addEvent.test.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,33 @@ import { useFakeTimers } from '../../utils/use-fake-timers';
66
useFakeTimers();
77

88
describe('Unit | util | addEvent', () => {
9-
it('clears queue after two checkouts in error mode', async function () {
9+
it('clears queue after two checkouts in error mode xxx', async function () {
1010
jest.setSystemTime(BASE_TIMESTAMP);
1111

1212
const replay = setupReplayContainer();
1313
replay.recordingMode = 'error';
1414

15-
await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP, type: 2 }, false);
16-
await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 10, type: 3 });
17-
await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 }, true);
15+
addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 10, type: 2 }, false);
16+
addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 0, type: 3 });
17+
addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 }, true);
1818

1919
expect(replay.getContext().earliestEvent).toEqual(BASE_TIMESTAMP);
2020
expect(replay.eventBuffer?.pendingEvents).toEqual([
21-
{ data: {}, timestamp: BASE_TIMESTAMP, type: 2 },
22-
{ data: {}, timestamp: BASE_TIMESTAMP + 10, type: 3 },
21+
{ data: {}, timestamp: BASE_TIMESTAMP + 10, type: 2 },
22+
{ data: {}, timestamp: BASE_TIMESTAMP, type: 3 },
2323
{ data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 },
2424
]);
2525

26-
await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 200, type: 2 }, true);
26+
addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 200, type: 2 }, true);
2727

2828
expect(replay.getContext().earliestEvent).toEqual(BASE_TIMESTAMP + 100);
2929
expect(replay.eventBuffer?.pendingEvents).toEqual([
3030
{ data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 },
3131
{ data: {}, timestamp: BASE_TIMESTAMP + 200, type: 2 },
3232
]);
3333

34-
await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 250, type: 3 }, false);
35-
await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 300, type: 2 }, true);
34+
addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 250, type: 3 }, false);
35+
addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 300, type: 2 }, true);
3636

3737
expect(replay.getContext().earliestEvent).toEqual(BASE_TIMESTAMP + 200);
3838
expect(replay.eventBuffer?.pendingEvents).toEqual([

0 commit comments

Comments
 (0)