Skip to content

Commit 8e9816a

Browse files
committed
ref(replay): Pause recording when replay requests are rate-limited
1 parent 1cf0ae0 commit 8e9816a

File tree

2 files changed

+201
-4
lines changed

2 files changed

+201
-4
lines changed

packages/replay/src/replay.ts

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
/* eslint-disable max-lines */ // TODO: We might want to split this file up
22
import { addGlobalEventProcessor, captureException, getCurrentHub, setContext } from '@sentry/core';
33
import type { Breadcrumb, ReplayEvent, ReplayRecordingMode, TransportMakeRequestResponse } from '@sentry/types';
4-
import { addInstrumentationHandler, logger } from '@sentry/utils';
4+
import {
5+
addInstrumentationHandler,
6+
disabledUntil,
7+
isRateLimited,
8+
logger,
9+
RateLimits,
10+
updateRateLimits,
11+
} from '@sentry/utils';
512
import { EventType, record } from 'rrweb';
613

714
import {
@@ -127,6 +134,11 @@ export class ReplayContainer implements ReplayContainerInterface {
127134
initialUrl: '',
128135
};
129136

137+
/**
138+
* A RateLimits object holding the rate-limit durations in case a sent replay event was rate-limited.
139+
*/
140+
private _rateLimits: RateLimits = {};
141+
130142
constructor({ options, recordingOptions }: { options: ReplayPluginOptions; recordingOptions: RecordingOptions }) {
131143
this._recordingOptions = recordingOptions;
132144
this._options = options;
@@ -984,7 +996,15 @@ export class ReplayContainer implements ReplayContainerInterface {
984996
const envelope = createReplayEnvelope(replayEvent, recordingData, dsn, client.getOptions().tunnel);
985997

986998
try {
987-
return await transport.send(envelope);
999+
const response = await transport.send(envelope);
1000+
// TODO (v8): we can remove this guard once transport.end's type signature doesn't include void anymore
1001+
if (response) {
1002+
this._rateLimits = updateRateLimits(this._rateLimits, response);
1003+
if (isRateLimited(this._rateLimits, 'replay_event') || isRateLimited(this._rateLimits, 'replay_recording')) {
1004+
this._handleRateLimit();
1005+
}
1006+
}
1007+
return response;
9881008
} catch {
9891009
throw new Error(UNABLE_TO_SEND_REPLAY);
9901010
}
@@ -1036,9 +1056,8 @@ export class ReplayContainer implements ReplayContainerInterface {
10361056
throw new Error(`${UNABLE_TO_SEND_REPLAY} - max retries exceeded`);
10371057
}
10381058

1039-
this._retryCount = this._retryCount + 1;
10401059
// will retry in intervals of 5, 10, 30
1041-
this._retryInterval = this._retryCount * this._retryInterval;
1060+
this._retryInterval = ++this._retryCount * this._retryInterval;
10421061

10431062
return await new Promise((resolve, reject) => {
10441063
setTimeout(async () => {
@@ -1065,4 +1084,28 @@ export class ReplayContainer implements ReplayContainerInterface {
10651084
saveSession(this.session);
10661085
}
10671086
}
1087+
1088+
/**
1089+
* Pauses the replay and resumes it after the rate-limit duration is over.
1090+
*/
1091+
private _handleRateLimit(): void {
1092+
const rateLimitEnd = Math.max(
1093+
disabledUntil(this._rateLimits, 'replay_event'),
1094+
disabledUntil(this._rateLimits, 'replay_recording'),
1095+
);
1096+
1097+
const rateLimitDuration = rateLimitEnd - Date.now();
1098+
1099+
if (rateLimitDuration > 0) {
1100+
__DEBUG_BUILD__ && logger.warn('[Replay]', `Rate limit hit, pausing replay for ${rateLimitDuration}ms`);
1101+
this.pause();
1102+
this._debouncedFlush && this._debouncedFlush.cancel();
1103+
1104+
setTimeout(() => {
1105+
__DEBUG_BUILD__ && logger.info('[Replay]', 'Resuming replay after rate limit');
1106+
this.resume();
1107+
this._debouncedFlush();
1108+
}, rateLimitDuration);
1109+
}
1110+
}
10681111
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import { Transport, TransportMakeRequestResponse } from '@sentry/types';
2+
import { ReplayContainer } from '../../src/replay';
3+
import { BASE_TIMESTAMP, mockSdk } from '../index';
4+
import { mockRrweb } from '../mocks/mockRrweb';
5+
import { useFakeTimers } from '../utils/use-fake-timers';
6+
import { getCurrentHub } from '@sentry/core';
7+
import { clearSession } from '../utils/clearSession';
8+
import { DEFAULT_FLUSH_MIN_DELAY, SESSION_IDLE_DURATION } from '../../src/constants';
9+
10+
useFakeTimers();
11+
12+
async function advanceTimers(time: number) {
13+
jest.advanceTimersByTime(time);
14+
await new Promise(process.nextTick);
15+
}
16+
17+
type MockTransportSend = jest.MockedFunction<Transport['send']>;
18+
type MockSendReplayRequest = jest.MockedFunction<ReplayContainer['sendReplayRequest']>;
19+
20+
describe('Integration | rate-limiting behaviour', () => {
21+
let replay: ReplayContainer;
22+
let mockTransportSend: MockTransportSend;
23+
let mockSendReplayRequest: MockSendReplayRequest;
24+
const { record: mockRecord } = mockRrweb();
25+
26+
beforeAll(async () => {
27+
jest.setSystemTime(new Date(BASE_TIMESTAMP));
28+
29+
({ replay } = await mockSdk({
30+
replayOptions: {
31+
stickySession: false,
32+
},
33+
}));
34+
35+
jest.spyOn(replay, 'sendReplayRequest');
36+
37+
jest.runAllTimers();
38+
mockTransportSend = getCurrentHub()?.getClient()?.getTransport()?.send as MockTransportSend;
39+
mockSendReplayRequest = replay.sendReplayRequest as MockSendReplayRequest;
40+
});
41+
42+
beforeEach(() => {
43+
jest.setSystemTime(new Date(BASE_TIMESTAMP));
44+
mockRecord.takeFullSnapshot.mockClear();
45+
mockTransportSend.mockClear();
46+
47+
// Create a new session and clear mocks because a segment (from initial
48+
// checkout) will have already been uploaded by the time the tests run
49+
clearSession(replay);
50+
replay.loadSession({ expiry: 0 });
51+
52+
mockSendReplayRequest.mockClear();
53+
});
54+
55+
afterEach(async () => {
56+
jest.runAllTimers();
57+
await new Promise(process.nextTick);
58+
jest.setSystemTime(new Date(BASE_TIMESTAMP));
59+
clearSession(replay);
60+
replay.loadSession({ expiry: SESSION_IDLE_DURATION });
61+
});
62+
63+
afterAll(() => {
64+
replay && replay.stop();
65+
});
66+
67+
it('pauses recording and flushing a rate limit is hit and resumes both after the rate limit duration is over', async () => {
68+
expect(replay.session?.segmentId).toBe(0);
69+
jest.spyOn(replay, 'sendReplay');
70+
jest.spyOn(replay, 'pause');
71+
jest.spyOn(replay, 'resume');
72+
// @ts-ignore private API
73+
jest.spyOn(replay, '_handleRateLimit');
74+
75+
const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };
76+
77+
mockTransportSend.mockImplementationOnce(() => {
78+
return Promise.resolve({
79+
statusCode: 429,
80+
headers: {
81+
'x-sentry-rate-limits': null,
82+
'retry-after': `30`,
83+
},
84+
} as TransportMakeRequestResponse);
85+
});
86+
87+
mockRecord._emitter(TEST_EVENT);
88+
89+
// T = base + 5
90+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
91+
92+
expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
93+
expect(mockTransportSend).toHaveBeenCalledTimes(1);
94+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) });
95+
96+
expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
97+
// resume() was called once before we even started
98+
expect(replay.resume).not.toHaveBeenCalled();
99+
expect(replay.pause).toHaveBeenCalledTimes(1);
100+
101+
// No user activity to trigger an update
102+
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
103+
expect(replay.session?.segmentId).toBe(1);
104+
105+
// let's simulate the rate-limit time of inactivity (30secs) and check that we don't do anything in the meantime
106+
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
107+
for (let i = 0; i < 5; i++) {
108+
const ev = {
109+
...TEST_EVENT2,
110+
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
111+
};
112+
mockRecord._emitter(ev);
113+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
114+
expect(replay.isPaused()).toBe(true);
115+
expect(replay.sendReplay).toHaveBeenCalledTimes(1);
116+
expect(mockTransportSend).toHaveBeenCalledTimes(1);
117+
}
118+
119+
// T = base + 35
120+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
121+
122+
// now, recording should resume and first, we expect a checkout event to be sent, as resume()
123+
// should trigger a full snapshot
124+
expect(replay.resume).toHaveBeenCalledTimes(1);
125+
expect(replay.isPaused()).toBe(false);
126+
127+
expect(replay.sendReplay).toHaveBeenCalledTimes(2);
128+
expect(replay).toHaveLastSentReplay({
129+
events: '[{"data":{"isCheckout":true},"timestamp":1580598035000,"type":2}]',
130+
});
131+
132+
// and let's also emit a new event and check that it is recorded
133+
const TEST_EVENT3 = {
134+
data: {},
135+
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
136+
type: 3,
137+
};
138+
mockRecord._emitter(TEST_EVENT3);
139+
140+
// T = base + 40
141+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
142+
expect(replay.sendReplay).toHaveBeenCalledTimes(3);
143+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });
144+
145+
// nothing should happen afterwards
146+
// T = base + 60
147+
await advanceTimers(20_000);
148+
expect(replay.sendReplay).toHaveBeenCalledTimes(3);
149+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });
150+
151+
// events array should be empty
152+
expect(replay.eventBuffer?.length).toBe(0);
153+
});
154+
});

0 commit comments

Comments
 (0)