Skip to content

feat(replay): Add non-async flush for page unloads #6854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions packages/replay/src/eventBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,17 @@ class EventBufferArray implements EventBuffer {
return;
}

public finish(): Promise<string> {
return new Promise<string>(resolve => {
// Make a copy of the events array reference and immediately clear the
// events member so that we do not lose new events while uploading
// attachment.
const eventsRet = this._events;
this._events = [];
resolve(JSON.stringify(eventsRet));
});
public async finish(): Promise<string> {
return this.finishSync();
}

public finishSync(): string {
// Make a copy of the events array reference and immediately clear the
// events member so that we do not lose new events while uploading
// attachment.
const events = this._events;
this._events = [];
return JSON.stringify(events);
}
}

Expand Down Expand Up @@ -158,6 +160,18 @@ export class EventBufferCompressionWorker implements EventBuffer {
return this._finishRequest(this._getAndIncrementId());
}

/**
* Finish the event buffer and return the pending events.
*/
public finishSync(): string {
const events = this._pendingEvents;

// Ensure worker is still in a good state and disregard the result
void this._finishRequest(this._getAndIncrementId());

return JSON.stringify(events);
}

/**
* Post message to worker and wait for response before resolving promise.
*/
Expand Down
165 changes: 128 additions & 37 deletions packages/replay/src/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type {
AddUpdateCallback,
AllPerformanceEntry,
EventBuffer,
FlushOptions,
InstrumentationTypeBreadcrumb,
InternalEventContext,
PopEventContext,
Expand Down Expand Up @@ -324,7 +325,6 @@ export class ReplayContainer implements ReplayContainerInterface {
}

/**
*
* Always flush via `_debouncedFlush` so that we do not have flushes triggered
* from calling both `flush` and `_debouncedFlush`. Otherwise, there could be
* cases of mulitple flushes happening closely together.
Expand All @@ -335,7 +335,7 @@ export class ReplayContainer implements ReplayContainerInterface {
return this._debouncedFlush.flush() as Promise<void>;
}

/** Get the current sesion (=replay) ID */
/** Get the current session (=replay) ID */
public getSessionId(): string | undefined {
return this.session && this.session.id;
}
Expand Down Expand Up @@ -625,7 +625,7 @@ export class ReplayContainer implements ReplayContainerInterface {
// Send replay when the page/tab becomes hidden. There is no reason to send
// replay if it becomes visible, since no actions we care about were done
// while it was hidden
this._conditionalFlush();
this._conditionalFlush({ sync: true });
}

/**
Expand Down Expand Up @@ -747,11 +747,20 @@ export class ReplayContainer implements ReplayContainerInterface {
/**
* Only flush if `this.recordingMode === 'session'`
*/
private _conditionalFlush(): void {
private _conditionalFlush(options: FlushOptions = {}): void {
if (this.recordingMode === 'error') {
return;
}

/**
* Page is likely to unload so need to bypass debounce completely and
* synchronously retrieve pending events from buffer and send request asap.
*/
if (options.sync) {
this._flushSync();
return;
}

void this.flushImmediate();
}

Expand Down Expand Up @@ -796,60 +805,142 @@ export class ReplayContainer implements ReplayContainerInterface {
* Should never be called directly, only by `flush`
*/
private async _runFlush(): Promise<void> {
if (!this.session || !this.eventBuffer) {
__DEBUG_BUILD__ && logger.error('[Replay] No session or eventBuffer found to flush.');
return;
}
try {
const flushData = this._prepareFlush();

await this._addPerformanceEntries();
if (!flushData) {
return;
}

// Check eventBuffer again, as it could have been stopped in the meanwhile
if (!this.eventBuffer || !this.eventBuffer.pendingLength) {
return;
}
const { promises, replayId, segmentId, eventContext, eventBuffer, session } = flushData;

// Only attach memory event if eventBuffer is not empty
await addMemoryEntry(this);
// NOTE: Be mindful that nothing after this point (the first `await`)
// will run after when the page is unloaded.
await Promise.all(promises);

// Check eventBuffer again, as it could have been stopped in the meanwhile
if (!this.eventBuffer) {
return;
}
// This can be empty due to blur events calling `runFlush` directly. In
// the case where we have a snapshot checkout and a blur event
// happening near the same time, the blur event can end up emptying the
// buffer even if snapshot happens first.
if (!eventBuffer.pendingLength) {
return;
}

try {
// Note this empties the event buffer regardless of outcome of sending replay
const recordingData = await this.eventBuffer.finish();

// NOTE: Copy values from instance members, as it's possible they could
// change before the flush finishes.
const replayId = this.session.id;
const eventContext = this._popEventContext();
// Always increment segmentId regardless of outcome of sending replay
const segmentId = this.session.segmentId++;
this._maybeSaveSession();
// This empties the event buffer regardless of outcome of sending replay
const recordingData = await eventBuffer.finish();

await sendReplay({
replayId,
recordingData,
segmentId,
includeReplayStartTimestamp: segmentId === 0,
eventContext,
session: this.session,
session,
options: this.getOptions(),
timestamp: new Date().getTime(),
});
} catch (err) {
this._handleException(err);
this._handleSendError(err);
}
}

if (err instanceof RateLimitError) {
this._handleRateLimit(err.rateLimits);
/**
* Flush event buffer synchonously.
* This is necessary e.g. when running flush on page unload or similar.
*/
private _flushSync(): void {
try {
const flushData = this._prepareFlush();

if (!flushData) {
return;
}

// This means we retried 3 times, and all of them failed
// In this case, we want to completely stop the replay - otherwise, we may get inconsistent segments
this.stop();
const { replayId, segmentId, eventContext, eventBuffer, session } = flushData;

const recordingData = eventBuffer.finishSync();

sendReplay({
replayId,
recordingData,
segmentId,
includeReplayStartTimestamp: segmentId === 0,
eventContext,
session,
options: this.getOptions(),
timestamp: new Date().getTime(),
}).catch(err => {
this._handleSendError(err);
});
} catch (err) {
this._handleSendError(err);
}
}

/** Prepare flush data */
private _prepareFlush():
| {
replayId: string;
eventContext: PopEventContext;
segmentId: number;
promises: Promise<unknown>[];
eventBuffer: EventBuffer;
session: Session;
}
| undefined {
if (!this.session || !this.eventBuffer) {
__DEBUG_BUILD__ && logger.error('[Replay] No session or eventBuffer found to flush.');
return;
}

this._debouncedFlush.cancel();

const promises: Promise<unknown>[] = [];

promises.push(this._addPerformanceEntries());

// Do not continue if there are no pending events in buffer
if (!this.eventBuffer || !this.eventBuffer.pendingLength) {
return;
}

// Only attach memory entry if eventBuffer is not empty
promises.push(addMemoryEntry(this));

// NOTE: Copy values from instance members, as it's possible they could
// change before the flush finishes.
const replayId = this.session.id;
const eventContext = this._popEventContext();
// Always increment segmentId regardless of outcome of sending replay
const segmentId = this.session.segmentId++;

// Save session (new segment id) after we save flush data assuming either
// 1) request succeeds or 2) it fails or never happens, in which case we
// need to retry this segment.
this._maybeSaveSession();

return {
replayId,
eventContext,
segmentId,
promises,
eventBuffer: this.eventBuffer,
session: this.session,
};
}

/** Handle an error when sending a replay. */
private _handleSendError(error: unknown): void {
this._handleException(error);

if (error instanceof RateLimitError) {
this._handleRateLimit(error.rateLimits);
return;
}

// This means we retried 3 times, and all of them failed
// In this case, we want to completely stop the replay - otherwise, we may get inconsistent segments
this.stop();
}

/**
Expand Down
18 changes: 18 additions & 0 deletions packages/replay/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ export type RecordingOptions = recordOptions;

export type AllPerformanceEntry = PerformancePaintTiming | PerformanceResourceTiming | PerformanceNavigationTiming;

export interface FlushOptions {
/**
* Attempt to finish the flush immediately without any asynchronous operations
* (e.g. worker calls). This is not directly related to `flushImmediate` which
* skips the debounced flush.
*/
sync?: boolean;
}

export interface SendReplayData {
recordingData: ReplayRecordingData;
replayId: string;
Expand All @@ -18,6 +27,10 @@ export interface SendReplayData {
options: ReplayPluginOptions;
}

export type PendingReplayData = Omit<SendReplayData, 'recordingData' | 'session' | 'options'> & {
recordingData: RecordingEvent[];
};

export type InstrumentationTypeBreadcrumb = 'dom' | 'scope';

/**
Expand Down Expand Up @@ -237,6 +250,11 @@ export interface EventBuffer {
* Clears and returns the contents of the buffer.
*/
finish(): Promise<ReplayRecordingData>;

/**
* Clears and synchronously returns the pending contents of the buffer. This means no compression.
*/
finishSync(): string;
}

export type AddUpdateCallback = () => boolean | void;
Expand Down
Loading