Skip to content

Commit b921631

Browse files
authored
ref(replay): Pause recording when replay requests are rate-limited (#6733)
Add the following rate-limiting strategy to the Replay integration: If we receive a rate limit response from the server after sending a replay event, we pause recording and flushing for the received rate limit period. After this period, we resume recording the same session, which triggers a full snapshot and hence should give us working replays. This should reduce segment-loss due to rate limiting, as we previously ignored handling rate limits specifically for replay (other than the transport's internal rate-limiting).
1 parent 24cbc4c commit b921631

File tree

3 files changed

+344
-5
lines changed

3 files changed

+344
-5
lines changed

packages/integration-tests/suites/replay/captureReplay/test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ sentryTest('captureReplay', async ({ getLocalTestPath, page }) => {
2323
await page.goto(url);
2424

2525
await page.click('button');
26-
await page.waitForTimeout(200);
26+
await page.waitForTimeout(300);
2727

2828
const replayEvent = await getFirstSentryEnvelopeRequest<Event>(page, url);
2929

packages/replay/src/replay.ts

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
/* eslint-disable max-lines */ // TODO: We might want to split this file up
22
import { addGlobalEventProcessor, captureException, getCurrentHub, setContext } from '@sentry/core';
33
import type { Breadcrumb, ReplayEvent, ReplayRecordingMode, TransportMakeRequestResponse } from '@sentry/types';
4-
import { addInstrumentationHandler, logger } from '@sentry/utils';
4+
import type { RateLimits } from '@sentry/utils';
5+
import { addInstrumentationHandler, disabledUntil, isRateLimited, logger, updateRateLimits } from '@sentry/utils';
56
import { EventType, record } from 'rrweb';
67

78
import {
@@ -128,6 +129,11 @@ export class ReplayContainer implements ReplayContainerInterface {
128129
initialUrl: '',
129130
};
130131

132+
/**
133+
* A RateLimits object holding the rate-limit durations in case a sent replay event was rate-limited.
134+
*/
135+
private _rateLimits: RateLimits = {};
136+
131137
public constructor({
132138
options,
133139
recordingOptions,
@@ -988,7 +994,15 @@ export class ReplayContainer implements ReplayContainerInterface {
988994
const envelope = createReplayEnvelope(replayEvent, recordingData, dsn, client.getOptions().tunnel);
989995

990996
try {
991-
return await transport.send(envelope);
997+
const response = await transport.send(envelope);
998+
// TODO (v8): we can remove this guard once transport.send's type signature doesn't include void anymore
999+
if (response) {
1000+
this._rateLimits = updateRateLimits(this._rateLimits, response);
1001+
if (isRateLimited(this._rateLimits, 'replay')) {
1002+
this._handleRateLimit();
1003+
}
1004+
}
1005+
return response;
9921006
} catch {
9931007
throw new Error(UNABLE_TO_SEND_REPLAY);
9941008
}
@@ -1040,9 +1054,8 @@ export class ReplayContainer implements ReplayContainerInterface {
10401054
throw new Error(`${UNABLE_TO_SEND_REPLAY} - max retries exceeded`);
10411055
}
10421056

1043-
this._retryCount = this._retryCount + 1;
10441057
// will retry in intervals of 5, 10, 30
1045-
this._retryInterval = this._retryCount * this._retryInterval;
1058+
this._retryInterval = ++this._retryCount * this._retryInterval;
10461059

10471060
return await new Promise((resolve, reject) => {
10481061
setTimeout(async () => {
@@ -1069,4 +1082,29 @@ export class ReplayContainer implements ReplayContainerInterface {
10691082
saveSession(this.session);
10701083
}
10711084
}
1085+
1086+
/**
1087+
* Pauses the replay and resumes it after the rate-limit duration is over.
1088+
*/
1089+
private _handleRateLimit(): void {
1090+
// in case recording is already paused, we don't need to do anything, as we might have already paused because of a
1091+
// rate limit
1092+
if (this.isPaused()) {
1093+
return;
1094+
}
1095+
1096+
const rateLimitEnd = disabledUntil(this._rateLimits, 'replay');
1097+
const rateLimitDuration = rateLimitEnd - Date.now();
1098+
1099+
if (rateLimitDuration > 0) {
1100+
__DEBUG_BUILD__ && logger.warn('[Replay]', `Rate limit hit, pausing replay for ${rateLimitDuration}ms`);
1101+
this.pause();
1102+
this._debouncedFlush && this._debouncedFlush.cancel();
1103+
1104+
setTimeout(() => {
1105+
__DEBUG_BUILD__ && logger.info('[Replay]', 'Resuming replay after rate limit');
1106+
this.resume();
1107+
}, rateLimitDuration);
1108+
}
1109+
}
10721110
}
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
import { getCurrentHub } from '@sentry/core';
2+
import type { Transport, TransportMakeRequestResponse } from '@sentry/types';
3+
4+
import { DEFAULT_FLUSH_MIN_DELAY, SESSION_IDLE_DURATION } from '../../src/constants';
5+
import type { ReplayContainer } from '../../src/replay';
6+
import { BASE_TIMESTAMP, mockSdk } from '../index';
7+
import { mockRrweb } from '../mocks/mockRrweb';
8+
import { clearSession } from '../utils/clearSession';
9+
import { useFakeTimers } from '../utils/use-fake-timers';
10+
11+
useFakeTimers();
12+
13+
async function advanceTimers(time: number) {
14+
jest.advanceTimersByTime(time);
15+
await new Promise(process.nextTick);
16+
}
17+
18+
type MockTransportSend = jest.MockedFunction<Transport['send']>;
19+
type MockSendReplayRequest = jest.MockedFunction<ReplayContainer['_sendReplayRequest']>;
20+
21+
describe('Integration | rate-limiting behaviour', () => {
22+
let replay: ReplayContainer;
23+
let mockTransportSend: MockTransportSend;
24+
let mockSendReplayRequest: MockSendReplayRequest;
25+
const { record: mockRecord } = mockRrweb();
26+
27+
beforeAll(async () => {
28+
jest.setSystemTime(new Date(BASE_TIMESTAMP));
29+
30+
({ replay } = await mockSdk({
31+
replayOptions: {
32+
stickySession: false,
33+
},
34+
}));
35+
36+
// @ts-ignore private API
37+
jest.spyOn(replay, '_sendReplayRequest');
38+
39+
jest.runAllTimers();
40+
mockTransportSend = getCurrentHub()?.getClient()?.getTransport()?.send as MockTransportSend;
41+
mockSendReplayRequest = replay['_sendReplayRequest'] as MockSendReplayRequest;
42+
});
43+
44+
beforeEach(() => {
45+
jest.setSystemTime(new Date(BASE_TIMESTAMP));
46+
mockRecord.takeFullSnapshot.mockClear();
47+
mockTransportSend.mockClear();
48+
49+
// Create a new session and clear mocks because a segment (from initial
50+
// checkout) will have already been uploaded by the time the tests run
51+
clearSession(replay);
52+
replay['_loadSession']({ expiry: 0 });
53+
54+
mockSendReplayRequest.mockClear();
55+
56+
replay['_rateLimits'] = {};
57+
});
58+
59+
afterEach(async () => {
60+
jest.runAllTimers();
61+
await new Promise(process.nextTick);
62+
jest.setSystemTime(new Date(BASE_TIMESTAMP));
63+
clearSession(replay);
64+
jest.clearAllMocks();
65+
replay['_loadSession']({ expiry: SESSION_IDLE_DURATION });
66+
});
67+
68+
afterAll(() => {
69+
replay && replay.stop();
70+
});
71+
72+
it.each([
73+
{
74+
statusCode: 429,
75+
headers: {
76+
'x-sentry-rate-limits': '30',
77+
'retry-after': null,
78+
},
79+
},
80+
{
81+
statusCode: 429,
82+
headers: {
83+
'x-sentry-rate-limits': '30:replay',
84+
'retry-after': null,
85+
},
86+
},
87+
{
88+
statusCode: 429,
89+
headers: {
90+
'x-sentry-rate-limits': null,
91+
'retry-after': '30',
92+
},
93+
},
94+
] as TransportMakeRequestResponse[])(
95+
'pauses recording and flushing a rate limit is hit and resumes both after the rate limit duration is over',
96+
async rateLimitResponse => {
97+
expect(replay.session?.segmentId).toBe(0);
98+
jest.spyOn(replay, 'pause');
99+
jest.spyOn(replay, 'resume');
100+
// @ts-ignore private API
101+
jest.spyOn(replay, '_handleRateLimit');
102+
// @ts-ignore private API
103+
jest.spyOn(replay, '_sendReplay');
104+
105+
const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };
106+
107+
mockTransportSend.mockImplementationOnce(() => {
108+
return Promise.resolve(rateLimitResponse);
109+
});
110+
111+
mockRecord._emitter(TEST_EVENT);
112+
113+
// T = base + 5
114+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
115+
116+
expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
117+
expect(mockTransportSend).toHaveBeenCalledTimes(1);
118+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) });
119+
120+
expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
121+
// resume() was called once before we even started
122+
expect(replay.resume).not.toHaveBeenCalled();
123+
expect(replay.pause).toHaveBeenCalledTimes(1);
124+
125+
// No user activity to trigger an update
126+
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
127+
expect(replay.session?.segmentId).toBe(1);
128+
129+
// let's simulate the rate-limit time of inactivity (30secs) and check that we don't do anything in the meantime
130+
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
131+
for (let i = 0; i < 5; i++) {
132+
const ev = {
133+
...TEST_EVENT2,
134+
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
135+
};
136+
mockRecord._emitter(ev);
137+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
138+
expect(replay.isPaused()).toBe(true);
139+
expect(replay['_sendReplay']).toHaveBeenCalledTimes(1);
140+
expect(mockTransportSend).toHaveBeenCalledTimes(1);
141+
}
142+
143+
// T = base + 35
144+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
145+
146+
// now, recording should resume and first, we expect a checkout event to be sent, as resume()
147+
// should trigger a full snapshot
148+
expect(replay.resume).toHaveBeenCalledTimes(1);
149+
expect(replay.isPaused()).toBe(false);
150+
151+
expect(replay['_sendReplay']).toHaveBeenCalledTimes(2);
152+
expect(replay).toHaveLastSentReplay({
153+
events: JSON.stringify([
154+
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 7, type: 2 },
155+
]),
156+
});
157+
158+
// and let's also emit a new event and check that it is recorded
159+
const TEST_EVENT3 = {
160+
data: {},
161+
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
162+
type: 3,
163+
};
164+
mockRecord._emitter(TEST_EVENT3);
165+
166+
// T = base + 40
167+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
168+
expect(replay['_sendReplay']).toHaveBeenCalledTimes(3);
169+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });
170+
171+
// nothing should happen afterwards
172+
// T = base + 60
173+
await advanceTimers(20_000);
174+
expect(replay['_sendReplay']).toHaveBeenCalledTimes(3);
175+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });
176+
177+
// events array should be empty
178+
expect(replay.eventBuffer?.pendingLength).toBe(0);
179+
},
180+
);
181+
182+
it('handles rate-limits from a plain 429 response without any retry time', async () => {
183+
expect(replay.session?.segmentId).toBe(0);
184+
jest.spyOn(replay, 'pause');
185+
jest.spyOn(replay, 'resume');
186+
// @ts-ignore private API
187+
jest.spyOn(replay, '_handleRateLimit');
188+
// @ts-ignore private API
189+
jest.spyOn(replay, '_sendReplay');
190+
191+
const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };
192+
193+
mockTransportSend.mockImplementationOnce(() => {
194+
return Promise.resolve({ statusCode: 429 });
195+
});
196+
197+
mockRecord._emitter(TEST_EVENT);
198+
199+
// T = base + 5
200+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
201+
202+
expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
203+
expect(mockTransportSend).toHaveBeenCalledTimes(1);
204+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) });
205+
206+
expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
207+
// resume() was called once before we even started
208+
expect(replay.resume).not.toHaveBeenCalled();
209+
expect(replay.pause).toHaveBeenCalledTimes(1);
210+
211+
// No user activity to trigger an update
212+
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
213+
expect(replay.session?.segmentId).toBe(1);
214+
215+
// let's simulate the rate-limit time of inactivity (60secs) and check that we don't do anything in the meantime
216+
// 60secs are the default we fall back to in the plain 429 case in updateRateLimits()
217+
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
218+
for (let i = 0; i < 11; i++) {
219+
const ev = {
220+
...TEST_EVENT2,
221+
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
222+
};
223+
mockRecord._emitter(ev);
224+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
225+
expect(replay.isPaused()).toBe(true);
226+
expect(replay['_sendReplay']).toHaveBeenCalledTimes(1);
227+
expect(mockTransportSend).toHaveBeenCalledTimes(1);
228+
}
229+
230+
// T = base + 60
231+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
232+
233+
// now, recording should resume and first, we expect a checkout event to be sent, as resume()
234+
// should trigger a full snapshot
235+
expect(replay.resume).toHaveBeenCalledTimes(1);
236+
expect(replay.isPaused()).toBe(false);
237+
238+
expect(replay['_sendReplay']).toHaveBeenCalledTimes(2);
239+
expect(replay).toHaveLastSentReplay({
240+
events: JSON.stringify([
241+
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 13, type: 2 },
242+
]),
243+
});
244+
245+
// and let's also emit a new event and check that it is recorded
246+
const TEST_EVENT3 = {
247+
data: {},
248+
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
249+
type: 3,
250+
};
251+
mockRecord._emitter(TEST_EVENT3);
252+
253+
// T = base + 65
254+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
255+
expect(replay['_sendReplay']).toHaveBeenCalledTimes(3);
256+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });
257+
258+
// nothing should happen afterwards
259+
// T = base + 85
260+
await advanceTimers(20_000);
261+
expect(replay['_sendReplay']).toHaveBeenCalledTimes(3);
262+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });
263+
264+
// events array should be empty
265+
expect(replay.eventBuffer?.pendingLength).toBe(0);
266+
});
267+
268+
it("doesn't do anything, if a rate limit is hit and recording is already paused", async () => {
269+
let paused = false;
270+
expect(replay.session?.segmentId).toBe(0);
271+
jest.spyOn(replay, 'isPaused').mockImplementation(() => {
272+
return paused;
273+
});
274+
jest.spyOn(replay, 'pause');
275+
jest.spyOn(replay, 'resume');
276+
// @ts-ignore private API
277+
jest.spyOn(replay, '_handleRateLimit');
278+
279+
const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };
280+
281+
mockTransportSend.mockImplementationOnce(() => {
282+
return Promise.resolve({ statusCode: 429 });
283+
});
284+
285+
mockRecord._emitter(TEST_EVENT);
286+
paused = true;
287+
288+
// T = base + 5
289+
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
290+
291+
expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
292+
expect(mockTransportSend).toHaveBeenCalledTimes(1);
293+
294+
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) });
295+
296+
expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
297+
expect(replay.resume).not.toHaveBeenCalled();
298+
expect(replay.isPaused).toHaveBeenCalledTimes(2);
299+
expect(replay.pause).not.toHaveBeenCalled();
300+
});
301+
});

0 commit comments

Comments
 (0)