Skip to content

Commit 4f7e6d5

Browse files
committed
ref(replay): Extract worker code into class
1 parent 0a89328 commit 4f7e6d5

File tree

2 files changed

+120
-90
lines changed

2 files changed

+120
-90
lines changed
Lines changed: 14 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { ReplayRecordingData } from '@sentry/types';
2-
import { logger } from '@sentry/utils';
32

4-
import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest, WorkerResponse } from '../types';
3+
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
4+
import { WorkerHandler } from './WorkerHandler';
55

66
/**
77
* Event buffer that uses a web worker to compress events.
@@ -11,57 +11,26 @@ export class EventBufferCompressionWorker implements EventBuffer {
1111
/** @inheritdoc */
1212
public hasEvents: boolean;
1313

14-
private _worker: Worker;
15-
private _id: number;
16-
private _ensureReadyPromise?: Promise<void>;
14+
private _worker: WorkerHandler;
1715

1816
public constructor(worker: Worker) {
19-
this._worker = worker;
17+
this._worker = new WorkerHandler(worker);
2018
this.hasEvents = false;
21-
this._id = 0;
2219
}
2320

2421
/**
2522
* Ensure the worker is ready (or not).
2623
* This will either resolve when the worker is ready, or reject if an error occured.
2724
*/
2825
public ensureReady(): Promise<void> {
29-
// Ensure we only check once
30-
if (this._ensureReadyPromise) {
31-
return this._ensureReadyPromise;
32-
}
33-
34-
this._ensureReadyPromise = new Promise((resolve, reject) => {
35-
this._worker.addEventListener(
36-
'message',
37-
({ data }: MessageEvent) => {
38-
if ((data as WorkerResponse).success) {
39-
resolve();
40-
} else {
41-
reject();
42-
}
43-
},
44-
{ once: true },
45-
);
46-
47-
this._worker.addEventListener(
48-
'error',
49-
error => {
50-
reject(error);
51-
},
52-
{ once: true },
53-
);
54-
});
55-
56-
return this._ensureReadyPromise;
26+
return this._worker.ensureReady();
5727
}
5828

5929
/**
6030
* Destroy the event buffer.
6131
*/
6232
public destroy(): void {
63-
__DEBUG_BUILD__ && logger.log('[Replay] Destroying compression worker');
64-
this._worker.terminate();
33+
this._worker.destroy();
6534
}
6635

6736
/**
@@ -75,10 +44,7 @@ export class EventBufferCompressionWorker implements EventBuffer {
7544
if (isCheckout) {
7645
// This event is a checkout, make sure worker buffer is cleared before
7746
// proceeding.
78-
await this._postMessage({
79-
id: this._getAndIncrementId(),
80-
method: 'clear',
81-
});
47+
await this._clear();
8248
}
8349

8450
return this._sendEventToWorker(event);
@@ -88,71 +54,29 @@ export class EventBufferCompressionWorker implements EventBuffer {
8854
* Finish the event buffer and return the compressed data.
8955
*/
9056
public finish(): Promise<ReplayRecordingData> {
91-
return this._finishRequest(this._getAndIncrementId());
92-
}
93-
94-
/**
95-
* Post message to worker and wait for response before resolving promise.
96-
*/
97-
private _postMessage<T>({ id, method, arg }: WorkerRequest): Promise<T> {
98-
return new Promise((resolve, reject) => {
99-
const listener = ({ data }: MessageEvent): void => {
100-
const response = data as WorkerResponse;
101-
if (response.method !== method) {
102-
return;
103-
}
104-
105-
// There can be multiple listeners for a single method, the id ensures
106-
// that the response matches the caller.
107-
if (response.id !== id) {
108-
return;
109-
}
110-
111-
// At this point, we'll always want to remove listener regardless of result status
112-
this._worker.removeEventListener('message', listener);
113-
114-
if (!response.success) {
115-
// TODO: Do some error handling, not sure what
116-
__DEBUG_BUILD__ && logger.error('[Replay]', response.response);
117-
118-
reject(new Error('Error in compression worker'));
119-
return;
120-
}
121-
122-
resolve(response.response as T);
123-
};
124-
125-
// Note: we can't use `once` option because it's possible it needs to
126-
// listen to multiple messages
127-
this._worker.addEventListener('message', listener);
128-
this._worker.postMessage({ id, method, arg });
129-
});
57+
return this._finishRequest();
13058
}
13159

13260
/**
13361
* Send the event to the worker.
13462
*/
13563
private _sendEventToWorker(event: RecordingEvent): Promise<AddEventResult> {
136-
return this._postMessage<void>({
137-
id: this._getAndIncrementId(),
138-
method: 'addEvent',
139-
arg: JSON.stringify(event),
140-
});
64+
return this._worker.postMessage<void>('addEvent', JSON.stringify(event));
14165
}
14266

14367
/**
14468
* Finish the request and return the compressed data from the worker.
14569
*/
146-
private async _finishRequest(id: number): Promise<Uint8Array> {
147-
const response = await this._postMessage<Uint8Array>({ id, method: 'finish' });
70+
private async _finishRequest(): Promise<Uint8Array> {
71+
const response = await this._worker.postMessage<Uint8Array>('finish');
14872

14973
this.hasEvents = false;
15074

15175
return response;
15276
}
15377

154-
/** Get the current ID and increment it for the next call. */
155-
private _getAndIncrementId(): number {
156-
return this._id++;
78+
/** Clear any pending events from the worker. */
79+
private _clear(): Promise<void> {
80+
return this._worker.postMessage('clear');
15781
}
15882
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { logger } from '@sentry/utils';
2+
3+
import type { WorkerRequest, WorkerResponse } from '../types';
4+
5+
/**
6+
* Event buffer that uses a web worker to compress events.
7+
* Exported only for testing.
8+
*/
9+
export class WorkerHandler {
10+
private _worker: Worker;
11+
private _id: number;
12+
private _ensureReadyPromise?: Promise<void>;
13+
14+
public constructor(worker: Worker) {
15+
this._worker = worker;
16+
this._id = 0;
17+
}
18+
19+
/**
20+
* Ensure the worker is ready (or not).
21+
* This will either resolve when the worker is ready, or reject if an error occured.
22+
*/
23+
public ensureReady(): Promise<void> {
24+
// Ensure we only check once
25+
if (this._ensureReadyPromise) {
26+
return this._ensureReadyPromise;
27+
}
28+
29+
this._ensureReadyPromise = new Promise((resolve, reject) => {
30+
this._worker.addEventListener(
31+
'message',
32+
({ data }: MessageEvent) => {
33+
if ((data as WorkerResponse).success) {
34+
resolve();
35+
} else {
36+
reject();
37+
}
38+
},
39+
{ once: true },
40+
);
41+
42+
this._worker.addEventListener(
43+
'error',
44+
error => {
45+
reject(error);
46+
},
47+
{ once: true },
48+
);
49+
});
50+
51+
return this._ensureReadyPromise;
52+
}
53+
54+
/**
55+
* Destroy the worker.
56+
*/
57+
public destroy(): void {
58+
__DEBUG_BUILD__ && logger.log('[Replay] Destroying compression worker');
59+
this._worker.terminate();
60+
}
61+
62+
/**
63+
* Post message to worker and wait for response before resolving promise.
64+
*/
65+
public postMessage<T>(method: WorkerRequest['method'], arg?: WorkerRequest['arg']): Promise<T> {
66+
const id = this._getAndIncrementId();
67+
68+
return new Promise((resolve, reject) => {
69+
const listener = ({ data }: MessageEvent): void => {
70+
const response = data as WorkerResponse;
71+
if (response.method !== method) {
72+
return;
73+
}
74+
75+
// There can be multiple listeners for a single method, the id ensures
76+
// that the response matches the caller.
77+
if (response.id !== id) {
78+
return;
79+
}
80+
81+
// At this point, we'll always want to remove listener regardless of result status
82+
this._worker.removeEventListener('message', listener);
83+
84+
if (!response.success) {
85+
// TODO: Do some error handling, not sure what
86+
__DEBUG_BUILD__ && logger.error('[Replay]', response.response);
87+
88+
reject(new Error('Error in compression worker'));
89+
return;
90+
}
91+
92+
resolve(response.response as T);
93+
};
94+
95+
// Note: we can't use `once` option because it's possible it needs to
96+
// listen to multiple messages
97+
this._worker.addEventListener('message', listener);
98+
this._worker.postMessage({ id, method, arg });
99+
});
100+
}
101+
102+
/** Get the current ID and increment it for the next call. */
103+
private _getAndIncrementId(): number {
104+
return this._id++;
105+
}
106+
}

0 commit comments

Comments
 (0)