-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(core): Add Offline Transport wrapper #6884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3ba2356
ef57767
26e5b0c
9a252ca
b93b02b
5f1af25
f86f4e6
c0938e4
8070337
fd9bb07
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
import type { Envelope, InternalBaseTransportOptions, Transport, TransportMakeRequestResponse } from '@sentry/types'; | ||
import { forEachEnvelopeItem, logger, parseRetryAfterHeader } from '@sentry/utils'; | ||
|
||
export const MIN_DELAY = 100; // 100 ms | ||
export const START_DELAY = 5_000; // 5 seconds | ||
const MAX_DELAY = 3.6e6; // 1 hour | ||
|
||
function isReplayEnvelope(envelope: Envelope): boolean { | ||
let isReplay = false; | ||
|
||
forEachEnvelopeItem(envelope, (_, type) => { | ||
if (type === 'replay_event') { | ||
isReplay = true; | ||
} | ||
}); | ||
|
||
return isReplay; | ||
} | ||
|
||
function log(msg: string, error?: Error): void { | ||
__DEBUG_BUILD__ && logger.info(`[Offline]: ${msg}`, error); | ||
} | ||
|
||
export interface OfflineStore { | ||
insert(env: Envelope): Promise<void>; | ||
pop(): Promise<Envelope | undefined>; | ||
} | ||
|
||
export type CreateOfflineStore = (options: OfflineTransportOptions) => OfflineStore; | ||
|
||
export interface OfflineTransportOptions extends InternalBaseTransportOptions { | ||
/** | ||
* A function that creates the offline store instance. | ||
*/ | ||
createStore?: CreateOfflineStore; | ||
|
||
/** | ||
* Flush the offline store shortly after startup. | ||
* | ||
* Defaults: false | ||
*/ | ||
flushAtStartup?: boolean; | ||
|
||
/** | ||
* Called before an event is stored. | ||
* | ||
* Return false to drop the envelope rather than store it. | ||
* | ||
* @param envelope The envelope that failed to send. | ||
* @param error The error that occurred. | ||
* @param retryDelay The current retry delay in milliseconds. | ||
*/ | ||
shouldStore?: (envelope: Envelope, error: Error, retryDelay: number) => boolean | Promise<boolean>; | ||
} | ||
|
||
type Timer = number | { unref?: () => void }; | ||
|
||
/** | ||
* Wraps a transport and stores and retries events when they fail to send. | ||
* | ||
* @param createTransport The transport to wrap. | ||
*/ | ||
export function makeOfflineTransport<TO>( | ||
createTransport: (options: TO) => Transport, | ||
): (options: TO & OfflineTransportOptions) => Transport { | ||
return options => { | ||
const transport = createTransport(options); | ||
const store = options.createStore ? options.createStore(options) : undefined; | ||
|
||
let retryDelay = START_DELAY; | ||
let flushTimer: Timer | undefined; | ||
|
||
function shouldQueue(env: Envelope, error: Error, retryDelay: number): boolean | Promise<boolean> { | ||
// We don't queue Session Replay envelopes because they are: | ||
// - Ordered and Replay relies on the response status to know when they're successfully sent. | ||
// - Likely to fill the queue quickly and block other events from being sent. | ||
if (isReplayEnvelope(env)) { | ||
return false; | ||
timfish marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
if (options.shouldStore) { | ||
return options.shouldStore(env, error, retryDelay); | ||
} | ||
|
||
return true; | ||
} | ||
|
||
function flushIn(delay: number): void { | ||
if (!store) { | ||
return; | ||
} | ||
|
||
if (flushTimer) { | ||
clearTimeout(flushTimer as ReturnType<typeof setTimeout>); | ||
} | ||
|
||
flushTimer = setTimeout(async () => { | ||
flushTimer = undefined; | ||
|
||
const found = await store.pop(); | ||
if (found) { | ||
log('Attempting to send previously queued event'); | ||
void send(found).catch(e => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to clarify: So if we try to re-send an envelope, we pop it from the store. but if it fails, we do not re-add it? So we would discard this envelope? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it will be re-added to the queued and we will continue to retry the envelope later. This means when offline, an envelope will be popped and re-inserted multiple times. The initial retry delay is only 5 seconds so if we don't retry, this is more of a "retry once" wrapper than offline. I considered having a |
||
log('Failed to retry sending', e); | ||
}); | ||
} | ||
}, delay) as Timer; | ||
|
||
// We need to unref the timer in node.js, otherwise the node process never exit. | ||
if (typeof flushTimer !== 'number' && flushTimer.unref) { | ||
flushTimer.unref(); | ||
} | ||
} | ||
|
||
function flushWithBackOff(): void { | ||
if (flushTimer) { | ||
return; | ||
} | ||
|
||
flushIn(retryDelay); | ||
|
||
retryDelay = Math.min(retryDelay * 2, MAX_DELAY); | ||
} | ||
|
||
async function send(envelope: Envelope): Promise<void | TransportMakeRequestResponse> { | ||
try { | ||
const result = await transport.send(envelope); | ||
|
||
let delay = MIN_DELAY; | ||
|
||
if (result) { | ||
// If there's a retry-after header, use that as the next delay. | ||
if (result.headers && result.headers['retry-after']) { | ||
delay = parseRetryAfterHeader(result.headers['retry-after']); | ||
} // If we have a server error, return now so we don't flush the queue. | ||
else if ((result.statusCode || 0) >= 400) { | ||
return result; | ||
} | ||
} | ||
|
||
flushIn(delay); | ||
retryDelay = START_DELAY; | ||
return result; | ||
} catch (e) { | ||
if (store && (await shouldQueue(envelope, e, retryDelay))) { | ||
await store.insert(envelope); | ||
flushWithBackOff(); | ||
log('Error sending. Event queued', e); | ||
return {}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. l: Do we need to return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, right, makes sense! just an idea, maybe we could return some content in that object (?) that makes it clear this is a failed resend, not sure. Prob. not a big deal but could help in the future with debugging or so ("Why am I getting an empty object back here???")? |
||
} else { | ||
throw e; | ||
} | ||
} | ||
} | ||
|
||
if (options.flushAtStartup) { | ||
flushWithBackOff(); | ||
} | ||
|
||
return { | ||
send, | ||
flush: t => transport.flush(t), | ||
}; | ||
}; | ||
} |
Uh oh!
There was an error while loading. Please reload this page.