Skip to content

feat(replay): Track pending events in EventBuffer #6699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions packages/replay/src/eventBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ class EventBufferArray implements EventBuffer {
this._events = [];
}

public get length(): number {
public get pendingLength(): number {
return this._events.length;
}

/**
* Returns the raw events that are buffered. In `EventBufferArray`, this is the
* same as `this._events`.
*/
public get pendingEvents(): RecordingEvent[] {
return this._events;
}

public destroy(): void {
this._events = [];
}
Expand Down Expand Up @@ -80,6 +88,13 @@ class EventBufferArray implements EventBuffer {
* Exported only for testing.
*/
export class EventBufferCompressionWorker implements EventBuffer {
/**
* Keeps track of the list of events since the last flush that have not been compressed.
* For example, page is reloaded and a flush attempt is made, but
* `finish()` (and thus the flush), does not complete.
*/
public _pendingEvents: RecordingEvent[] = [];

private _worker: null | Worker;
private _eventBufferItemLength: number = 0;
private _id: number = 0;
Expand All @@ -89,13 +104,21 @@ export class EventBufferCompressionWorker implements EventBuffer {
}

/**
* Note that this may not reflect what is actually in the event buffer. This
* is only a local count of the buffer size since `addEvent` is async.
* The number of raw events that are buffered. This may not be the same as
* the number of events that have been compresed in the worker because
* `addEvent` is async.
*/
public get length(): number {
public get pendingLength(): number {
return this._eventBufferItemLength;
}

/**
* Returns a list of the raw recording events that are being compressed.
*/
public get pendingEvents(): RecordingEvent[] {
return this._pendingEvents;
}

/**
* Destroy the event buffer.
*/
Expand All @@ -121,6 +144,11 @@ export class EventBufferCompressionWorker implements EventBuffer {
});
}

// Don't store checkout events in `_pendingEvents` because they are too large
if (!isCheckout) {
this._pendingEvents.push(event);
}

return this._sendEventToWorker(event);
}

Expand Down Expand Up @@ -202,6 +230,10 @@ export class EventBufferCompressionWorker implements EventBuffer {
// XXX: See note in `get length()`
this._eventBufferItemLength = 0;

await promise;

this._pendingEvents = [];

return promise;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/replay/src/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ export class ReplayContainer implements ReplayContainerInterface {

await this.addPerformanceEntries();

if (!this.eventBuffer?.length) {
if (!this.eventBuffer?.pendingLength) {
return;
}

Expand Down
12 changes: 10 additions & 2 deletions packages/replay/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,15 @@ export interface Session {
}

export interface EventBuffer {
readonly length: number;
/**
* The number of raw events that are buffered
*/
readonly pendingLength: number;

/**
* The raw events that are buffered.
*/
readonly pendingEvents: RecordingEvent[];

/**
* Destroy the event buffer.
Expand All @@ -226,7 +234,7 @@ export interface EventBuffer {
addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult>;

/**
* Clears and returns the contents and the buffer.
* Clears and returns the contents of the buffer.
*/
finish(): Promise<ReplayRecordingData>;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/replay/src/worker/worker.js

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions packages/replay/test/integration/sendReplayEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ describe('Integration | sendReplayEvent', () => {
expect(replay.session?.segmentId).toBe(1);

// events array should be empty
expect(replay.eventBuffer?.length).toBe(0);
expect(replay.eventBuffer?.pendingLength).toBe(0);
});

it('update last activity when user clicks mouse', async () => {
Expand Down Expand Up @@ -141,7 +141,7 @@ describe('Integration | sendReplayEvent', () => {
expect(replay.session?.segmentId).toBe(1);

// events array should be empty
expect(replay.eventBuffer?.length).toBe(0);
expect(replay.eventBuffer?.pendingLength).toBe(0);
});

it('uploads a replay event if 15 seconds have elapsed since the last replay upload', async () => {
Expand Down Expand Up @@ -169,7 +169,7 @@ describe('Integration | sendReplayEvent', () => {
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);
// events array should be empty
expect(replay.eventBuffer?.length).toBe(0);
expect(replay.eventBuffer?.pendingLength).toBe(0);

// Let's make sure it continues to work
mockTransportSend.mockClear();
Expand Down Expand Up @@ -214,7 +214,7 @@ describe('Integration | sendReplayEvent', () => {
// Session's last activity should not be updated
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
// events array should be empty
expect(replay.eventBuffer?.length).toBe(0);
expect(replay.eventBuffer?.pendingLength).toBe(0);
});

it('uploads a replay event when document becomes hidden', async () => {
Expand Down Expand Up @@ -242,7 +242,7 @@ describe('Integration | sendReplayEvent', () => {
// visibilitystate as user being active
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
// events array should be empty
expect(replay.eventBuffer?.length).toBe(0);
expect(replay.eventBuffer?.pendingLength).toBe(0);
});

it('uploads a replay event if 5 seconds have elapsed since the last replay event occurred', async () => {
Expand All @@ -261,7 +261,7 @@ describe('Integration | sendReplayEvent', () => {
expect(replay.session?.segmentId).toBe(1);

// events array should be empty
expect(replay.eventBuffer?.length).toBe(0);
expect(replay.eventBuffer?.pendingLength).toBe(0);
});

it('uploads a replay event if 15 seconds have elapsed since the last replay upload', async () => {
Expand Down Expand Up @@ -290,7 +290,7 @@ describe('Integration | sendReplayEvent', () => {
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);
// events array should be empty
expect(replay.eventBuffer?.length).toBe(0);
expect(replay.eventBuffer?.pendingLength).toBe(0);

// Let's make sure it continues to work
mockTransportSend.mockClear();
Expand Down
6 changes: 3 additions & 3 deletions packages/replay/test/integration/stop.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,17 @@ describe('Integration | stop', () => {

it('does not buffer events when stopped', async function () {
WINDOW.dispatchEvent(new Event('blur'));
expect(replay.eventBuffer?.length).toBe(1);
expect(replay.eventBuffer?.pendingLength).toBe(1);

// stop replays
integration.stop();

expect(replay.eventBuffer?.length).toBe(undefined);
expect(replay.eventBuffer?.pendingLength).toBe(undefined);

WINDOW.dispatchEvent(new Event('blur'));
await new Promise(process.nextTick);

expect(replay.eventBuffer?.length).toBe(undefined);
expect(replay.eventBuffer?.pendingLength).toBe(undefined);
expect(replay).not.toHaveLastSentReplay();
});

Expand Down
4 changes: 2 additions & 2 deletions packages/replay/test/unit/eventBuffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ describe('Unit | eventBuffer', () => {
}) as EventBufferCompressionWorker;

buffer.addEvent(TEST_EVENT);
// @ts-ignore make sure it handles invalid data
buffer.addEvent(undefined);
buffer.addEvent(TEST_EVENT);

expect(buffer.pendingEvents).toEqual([TEST_EVENT, TEST_EVENT]);

const result = await buffer.finish();
const restored = pako.inflate(result, { to: 'string' });

Expand Down