Skip to content

feat(replay): Keep min. 30s of data in buffer & worker mode #7992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions packages/replay-worker/src/Compressor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { constants, Deflate, deflate } from 'pako';
import { constants, Deflate } from 'pako';

/**
* A stateful compressor that can be used to batch compress events.
Expand All @@ -7,7 +7,7 @@ export class Compressor {
/**
* pako deflator instance
*/
public deflate: Deflate;
private _deflate: Deflate;

/**
* If any events have been added.
Expand Down Expand Up @@ -38,7 +38,7 @@ export class Compressor {
// TODO: We may want Z_SYNC_FLUSH or Z_FULL_FLUSH (not sure the difference)
// Using NO_FLUSH here for now as we can create many attachments that our
// web UI will get API rate limited.
this.deflate.push(prefix + data, constants.Z_SYNC_FLUSH);
this._deflate.push(prefix + data, constants.Z_SYNC_FLUSH);

this._hasEvents = true;
}
Expand All @@ -48,15 +48,15 @@ export class Compressor {
*/
public finish(): Uint8Array {
// We should always have a list, it can be empty
this.deflate.push(']', constants.Z_FINISH);
this._deflate.push(']', constants.Z_FINISH);

if (this.deflate.err) {
throw this.deflate.err;
if (this._deflate.err) {
throw this._deflate.err;
}

// Copy result before we create a new deflator and return the compressed
// result
const result = this.deflate.result;
const result = this._deflate.result;

this._init();

Expand All @@ -68,16 +68,9 @@ export class Compressor {
*/
private _init(): void {
this._hasEvents = false;
this.deflate = new Deflate();
this._deflate = new Deflate();

// Fake an array by adding a `[`
this.deflate.push('[', constants.Z_NO_FLUSH);
this._deflate.push('[', constants.Z_NO_FLUSH);
}
}

/**
* Compress a string.
*/
export function compress(data: string): Uint8Array {
return deflate(data);
}
103 changes: 72 additions & 31 deletions packages/replay-worker/src/handleMessage.ts
Original file line number Diff line number Diff line change
@@ -1,60 +1,101 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { compress, Compressor } from './Compressor';

const compressor = new Compressor();
import { Compressor } from './Compressor';

interface Handlers {
clear: () => void;
clear: (mode?: string) => void;
addEvent: (data: string) => void;
finish: () => Uint8Array;
compress: (data: string) => Uint8Array;
}

const handlers: Handlers = {
clear: () => {
compressor.clear();
},
class CompressionHandler implements Handlers {
private _compressor: Compressor;
private _bufferCompressor?: Compressor;

public constructor() {
this._compressor = new Compressor();
}

public clear(mode?: string): void {
/*
In buffer mode, we want to make sure to always keep the last round of events around.
So when the time comes and we finish the buffer, we can ensure that we have at least one set of events.
Without this change, it can happen that you finish right after the last checkout (=clear),
and thus have no (or very few) events buffered.

Now, in buffer mode, we basically have to compressors, which are swapped and reset on clear:
* On first `clear` in buffer mode, we initialize the buffer compressor.
The regular compressor keeps running as the "current" one
* On consequitive `clear` calls, we swap the buffer compressor in as the "current" one, and initialize a new buffer compressor
This will clear any events that were buffered before the _last_ clear call.

This sadly means we need to keep the buffer twice in memory. But it's a tradeoff we have to make.
*/
if (mode === 'buffer') {
// This means it is the first clear in buffer mode - just initialize a new compressor for the alternate compressor
if (!this._bufferCompressor) {
this._bufferCompressor = new Compressor();
} else {
// Else, swap the alternative compressor in as "normal" compressor, and initialize a new alterntive compressor
this._compressor = this._bufferCompressor;
this._bufferCompressor = new Compressor();
}
return;
}

/*
In non-buffer mode, we just clear the current compressor (and make sure an eventual buffer compressor is reset)
*/
this._bufferCompressor = undefined;

this._compressor.clear();
}

addEvent: (data: string) => {
return compressor.addEvent(data);
},
public addEvent(data: string): void {
if (this._bufferCompressor) {
this._bufferCompressor.addEvent(data);
}

finish: () => {
return compressor.finish();
},
return this._compressor.addEvent(data);
}

compress: (data: string) => {
return compress(data);
},
};
public finish(): Uint8Array {
if (this._bufferCompressor) {
this._bufferCompressor.clear();
this._bufferCompressor = undefined;
}

return this._compressor.finish();
}
}

const handlers = new CompressionHandler();

/**
* Handler for worker messages.
*/
export function handleMessage(e: MessageEvent): void {
const method = e.data.method as string;
const id = e.data.id as number;
const data = e.data.arg as string;
export function handleMessage(event: MessageEvent): void {
const data = event.data as {
method: keyof Handlers;
id: number;
arg: string;
};

const { method, id, arg } = data;

// @ts-ignore this syntax is actually fine
if (method in handlers && typeof handlers[method] === 'function') {
if (typeof handlers[method] === 'function') {
try {
// @ts-ignore this syntax is actually fine
const response = handlers[method](data);
// @ts-ignore this syntax is actually fine
const response = handlers[method](arg);
postMessage({
id,
method,
success: true,
response,
});
} catch (err) {
// @ts-ignore this syntax is actually fine
postMessage({
id,
method,
success: false,
response: err.message,
response: (err as Error).message,
});

// eslint-disable-next-line no-console
Expand Down
44 changes: 41 additions & 3 deletions packages/replay/jest.setup.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { getCurrentHub } from '@sentry/core';
import type { ReplayRecordingData, Transport } from '@sentry/types';
import { TextEncoder } from 'util';
import pako from 'pako';
import { TextDecoder, TextEncoder } from 'util';

import type { ReplayContainer, Session } from './src/types';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
(global as any).TextEncoder = TextEncoder;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(global as any).TextDecoder = TextDecoder;

type MockTransport = jest.MockedFunction<Transport['send']>;

Expand Down Expand Up @@ -71,6 +74,40 @@ type Call = [
];
type CheckCallForSentReplayResult = { pass: boolean; call: Call | undefined; results: Result[] };

function parseRecordingData(recordingPayload: undefined | string | Uint8Array): string {
if (!recordingPayload) {
return '';
}

if (typeof recordingPayload === 'string') {
return recordingPayload;
}

if (recordingPayload instanceof Uint8Array) {
// We look up the place where the zlib compression header(0x78 0x9c) starts
// As the payload consists of two UInt8Arrays joined together, where the first part is a TextEncoder encoded string,
// and the second part a pako-compressed one
for (let i = 0; i < recordingPayload.length; i++) {
if (recordingPayload[i] === 0x78 && recordingPayload[i + 1] === 0x9c) {
try {
// We found a zlib-compressed payload - let's decompress it
const header = recordingPayload.slice(0, i);
const payload = recordingPayload.slice(i);
// now we return the decompressed payload as JSON
const decompressedPayload = pako.inflate(payload, { to: 'string' });
const decompressedHeader = new TextDecoder().decode(header);

return `${decompressedHeader}${decompressedPayload}`;
} catch (error) {
throw new Error(`Could not parse UInt8Array payload: ${error}`);
}
}
}
}

throw new Error(`Invalid recording payload: ${recordingPayload}`);
}

function checkCallForSentReplay(
call: Call | undefined,
expected?: SentReplayExpected | { sample: SentReplayExpected; inverse: boolean },
Expand All @@ -79,8 +116,9 @@ function checkCallForSentReplay(
const envelopeItems = call?.[1] || [[], []];
const [[replayEventHeader, replayEventPayload], [recordingHeader, recordingPayload] = []] = envelopeItems;

// @ts-ignore recordingPayload is always a string in our tests
const [recordingPayloadHeader, recordingData] = recordingPayload?.split('\n') || [];
const recordingStr = parseRecordingData(recordingPayload as unknown as string | Uint8Array);

const [recordingPayloadHeader, recordingData] = recordingStr?.split('\n') || [];

const actualObj: Required<SentReplayExpected> = {
// @ts-ignore Custom envelope
Expand Down
4 changes: 2 additions & 2 deletions packages/replay/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export const DEFAULT_FLUSH_MIN_DELAY = 5_000;
// was the same as `wait`
export const DEFAULT_FLUSH_MAX_DELAY = 5_500;

/* How long to wait for error checkouts */
export const BUFFER_CHECKOUT_TIME = 60_000;
/* How long to wait for buffer checkouts. */
export const BUFFER_CHECKOUT_TIME = 30_000;

export const RETRY_BASE_INTERVAL = 5000;
export const RETRY_MAX_COUNT = 3;
Expand Down
36 changes: 32 additions & 4 deletions packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ReplayRecordingData } from '@sentry/types';
import type { ReplayRecordingData, ReplayRecordingMode } from '@sentry/types';

import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
import { timestampToMs } from '../util/timestampToMs';
Expand All @@ -11,10 +11,12 @@ import { WorkerHandler } from './WorkerHandler';
export class EventBufferCompressionWorker implements EventBuffer {
private _worker: WorkerHandler;
private _earliestTimestamp: number | null;
private _bufferEarliestTimestamp: number | null;

public constructor(worker: Worker) {
this._worker = new WorkerHandler(worker);
this._earliestTimestamp = null;
this._bufferEarliestTimestamp = null;
}

/** @inheritdoc */
Expand Down Expand Up @@ -48,6 +50,15 @@ export class EventBufferCompressionWorker implements EventBuffer {
this._earliestTimestamp = timestamp;
}

/*
We also update this in parallel, in case we need it.
At this point we don't really know if this is a buffer recording,
so just always keeping this is the safest solution.
*/
if (!this._bufferEarliestTimestamp || timestamp < this._bufferEarliestTimestamp) {
this._bufferEarliestTimestamp = timestamp;
}

return this._sendEventToWorker(event);
}

Expand All @@ -59,10 +70,26 @@ export class EventBufferCompressionWorker implements EventBuffer {
}

/** @inheritdoc */
public clear(): void {
this._earliestTimestamp = null;
public clear(recordingMode: ReplayRecordingMode): void {
if (recordingMode === 'buffer') {
/*
In buffer mode, we want to make sure to always keep the last round of events around.
So when the time comes and we finish the buffer, we can ensure that we have at least one set of events.
Without this change, it can happen that you finish right after the last checkout (=clear),
and thus have no (or very few) events buffered.

Because of this, we keep track of the previous earliest timestamp as well.
When the next clear comes, we set the current earliest timestamp to the previous one.
*/
this._earliestTimestamp = this._bufferEarliestTimestamp;
this._bufferEarliestTimestamp = null;
} else {
this._earliestTimestamp = null;
this._bufferEarliestTimestamp = null;
}

// We do not wait on this, as we assume the order of messages is consistent for the worker
void this._worker.postMessage('clear');
void this._worker.postMessage('clear', recordingMode);
}

/** @inheritdoc */
Expand All @@ -84,6 +111,7 @@ export class EventBufferCompressionWorker implements EventBuffer {
const response = await this._worker.postMessage<Uint8Array>('finish');

this._earliestTimestamp = null;
this._bufferEarliestTimestamp = null;

return response;
}
Expand Down
6 changes: 3 additions & 3 deletions packages/replay/src/eventBuffer/EventBufferProxy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ReplayRecordingData } from '@sentry/types';
import type { ReplayRecordingData, ReplayRecordingMode } from '@sentry/types';
import { logger } from '@sentry/utils';

import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
Expand Down Expand Up @@ -36,8 +36,8 @@ export class EventBufferProxy implements EventBuffer {
}

/** @inheritdoc */
public clear(): void {
return this._used.clear();
public clear(recordingMode: ReplayRecordingMode): void {
return this._used.clear(recordingMode);
}

/** @inheritdoc */
Expand Down
3 changes: 3 additions & 0 deletions packages/replay/src/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ export class ReplayContainer implements ReplayContainerInterface {
// If neither sample rate is > 0, then do nothing - user will need to call one of
// `start()` or `startBuffering` themselves.
if (errorSampleRate <= 0 && sessionSampleRate <= 0) {
__DEBUG_BUILD__ && logger.log('[Replay] No sample rate set, not initializing.');
return;
}

Expand Down Expand Up @@ -194,6 +195,8 @@ export class ReplayContainer implements ReplayContainerInterface {
this.recordingMode = 'buffer';
}

__DEBUG_BUILD__ && logger.log(`[Replay] Session initialized in mode ${this.recordingMode}`);

this._initializeRecording();
}

Expand Down
2 changes: 1 addition & 1 deletion packages/replay/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ export interface EventBuffer {
/**
* Clear the event buffer.
*/
clear(): void;
clear(recordingMode: ReplayRecordingMode): void;

/**
* Add an event to the event buffer.
Expand Down
Loading