@@ -54,9 +54,6 @@ export class SegmentClient {
54
54
// internal time to know when to flush, ticks every second
55
55
private flushInterval : ReturnType < typeof setInterval > | null = null ;
56
56
57
- // Watcher for isReady updates to the storage
58
- private readinessWatcher ?: Unsubscribe = undefined ;
59
-
60
57
// unsubscribe watchers for the store
61
58
private watchers : Unsubscribe [ ] = [ ] ;
62
59
@@ -70,8 +67,8 @@ export class SegmentClient {
70
67
71
68
private timeline : Timeline ;
72
69
73
- // mechanism to prevent adding plugins before we are fully initalised
74
- private isStorageReady = false ;
70
+ private pendingEvents : SegmentEvent [ ] = [ ] ;
71
+
75
72
private pluginsToAdd : Plugin [ ] = [ ] ;
76
73
77
74
private isInitialized = false ;
@@ -167,9 +164,6 @@ export class SegmentClient {
167
164
this . add ( { plugin : segmentDestination } ) ;
168
165
}
169
166
170
- // Setup platform specific plugins
171
- this . platformPlugins . forEach ( ( plugin ) => this . add ( { plugin : plugin } ) ) ;
172
-
173
167
// Initialize the watchables
174
168
this . context = {
175
169
get : this . store . context . get ,
@@ -199,6 +193,13 @@ export class SegmentClient {
199
193
get : this . store . events . get ,
200
194
onChange : this . store . events . onChange ,
201
195
} ;
196
+
197
+ // Watch for isReady so that we can handle any pending events
198
+ // Delays events processing in the timeline until the store is ready to prevent missing data injected from the plugins
199
+ this . store . isReady . onChange ( ( value ) => this . onStorageReady ( value ) ) ;
200
+
201
+ // Setup platform specific plugins
202
+ this . platformPlugins . forEach ( ( plugin ) => this . add ( { plugin : plugin } ) ) ;
202
203
}
203
204
204
205
/**
@@ -211,12 +212,6 @@ export class SegmentClient {
211
212
return ;
212
213
}
213
214
214
- // Plugin interval check
215
- if ( this . store . isReady . get ( ) ) {
216
- this . onStorageReady ( true ) ;
217
- } else {
218
- this . store . isReady . onChange ( ( value ) => this . onStorageReady ( value ) ) ;
219
- }
220
215
await this . fetchSettings ( ) ;
221
216
222
217
// flush any stored events
@@ -290,7 +285,6 @@ export class SegmentClient {
290
285
clearInterval ( this . flushInterval ) ;
291
286
}
292
287
293
- this . unsubscribeReadinessWatcher ( ) ;
294
288
this . unsubscribeStorageWatchers ( ) ;
295
289
296
290
this . appStateSubscription ?. remove ( ) ;
@@ -358,7 +352,7 @@ export class SegmentClient {
358
352
this . store . settings . add ( ( plugin as DestinationPlugin ) . key , settings ) ;
359
353
}
360
354
361
- if ( ! this . isStorageReady ) {
355
+ if ( ! this . store . isReady . get ( ) ) {
362
356
this . pluginsToAdd . push ( plugin ) ;
363
357
} else {
364
358
this . addPlugin ( plugin ) ;
@@ -381,7 +375,11 @@ export class SegmentClient {
381
375
382
376
process ( incomingEvent : SegmentEvent ) {
383
377
const event = applyRawEventData ( incomingEvent , this . store . userInfo . get ( ) ) ;
384
- this . timeline . process ( event ) ;
378
+ if ( this . store . isReady . get ( ) === true ) {
379
+ this . timeline . process ( event ) ;
380
+ } else {
381
+ this . pendingEvents . push ( event ) ;
382
+ }
385
383
}
386
384
387
385
private async trackDeepLinks ( ) {
@@ -399,29 +397,34 @@ export class SegmentClient {
399
397
}
400
398
}
401
399
402
- private unsubscribeReadinessWatcher ( ) {
403
- this . readinessWatcher ?. ( ) ;
404
- }
405
-
400
+ /**
401
+ * Executes when the state store is initialized.
402
+ * @param isReady
403
+ */
406
404
private onStorageReady ( isReady : boolean ) {
407
- if ( isReady && this . pluginsToAdd . length > 0 && ! this . isAddingPlugins ) {
408
- this . isAddingPlugins = true ;
409
- try {
410
- // start by adding the plugins
411
- this . pluginsToAdd . forEach ( ( plugin ) => {
412
- this . addPlugin ( plugin ) ;
413
- } ) ;
414
-
415
- // now that they're all added, clear the cache
416
- // this prevents this block running for every update
417
- this . pluginsToAdd = [ ] ;
405
+ if ( isReady ) {
406
+ // Add all plugins awaiting store
407
+ if ( this . pluginsToAdd . length > 0 && ! this . isAddingPlugins ) {
408
+ this . isAddingPlugins = true ;
409
+ try {
410
+ // start by adding the plugins
411
+ this . pluginsToAdd . forEach ( ( plugin ) => {
412
+ this . addPlugin ( plugin ) ;
413
+ } ) ;
414
+
415
+ // now that they're all added, clear the cache
416
+ // this prevents this block running for every update
417
+ this . pluginsToAdd = [ ] ;
418
+ } finally {
419
+ this . isAddingPlugins = false ;
420
+ }
421
+ }
418
422
419
- // finally set the flag which means plugins will be added + registered immediately in future
420
- this . isStorageReady = true ;
421
- this . unsubscribeReadinessWatcher ( ) ;
422
- } finally {
423
- this . isAddingPlugins = false ;
423
+ // Send all events in the queue
424
+ for ( const e of this . pendingEvents ) {
425
+ this . timeline . process ( e ) ;
424
426
}
427
+ this . pendingEvents = [ ] ;
425
428
}
426
429
}
427
430
0 commit comments