Skip to content

feat: add Flush Policies #703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ The hassle-free way to add Segment analytics to your React-Native app.
- [Adding Plugins](#adding-plugins)
- [Writing your own Plugins](#writing-your-own-plugins)
- [Supported Plugins](#supported-plugins)
- [Controlling Upload With Flush Policies](#controlling-upload-with-flush-policies)
- [Adding or removing policies](#adding-or-removing-policies)
- [Creating your own flush policies](#creating-your-own-flush-policies)
- [Contributing](#contributing)
- [Code of Conduct](#code-of-conduct)
- [License](#license)
Expand Down Expand Up @@ -526,6 +529,98 @@ Refer to the following table for Plugins you can use to meet your tracking needs
| [Android Advertising ID](https://github.com/segmentio/analytics-react-native/tree/master/packages/plugins/plugin-advertising-id) | `@segment/analytics-react-native-plugin-advertising-id` |


## Controlling Upload With Flush Policies

To more granurily control when events are uploaded you can use `FlushPolicies`

A Flush Policy defines the strategy for deciding when to flush, this can be on an interval, on a certain time of day, after receiving a certain number of events or even after receiving a particular event. This gives you even more flexibility on when to send event to Segment.

To make use of flush policies you can set them in the configuration of the client:

```ts
const client = createClient({
// ...
flushPolicies: [
new CountFlushPolicy(5),
new TimerFlushPolicy(500),
new StartupFlushPolicy(),
],
});
```

You can set several policies at a time. Whenever any of them decides it is time for a flush it will trigger an upload of the events. The rest get reset so that their logic restarts after every flush.

That means only the first policy to reach `shouldFlush` gets to trigger a flush at a time. In the example above either the event count gets to 5 or the timer reaches 500ms, whatever comes first will trigger a flush.

We have several standard FlushPolicies:
- `CountFlushPolicy` triggers whenever a certain number of events is reached
- `TimerFlushPolicy` triggers on an interval of milliseconds
- `StartupFlushPolicy` triggers on client startup only

## Adding or removing policies

One of the main advatanges of FlushPolicies is that you can add and remove policies on the fly. This is very powerful when you want to reduce or increase the amount of flushes.

For example you might want to disable flushes if you detect the user has no network:

```ts

import NetInfo from "@react-native-community/netinfo";

const policiesIfNetworkIsUp = [
new CountFlushPolicy(5),
new TimerFlushPolicy(500),
];

// Create our client with our policies by default
const client = createClient({
// ...
flushPolicies: policies,
});

// If we detect the user disconnects from the network remove all flush policies,
// that way we won't keep attempting to send events to segment but we will still
// store them for future upload.
// If the network comes back up we add the policies back
const unsubscribe = NetInfo.addEventListener((state) => {
if (state.isConnected) {
client.addFlushPolicy(...policiesIfNetworkIsUp);
} else {
client.removeFlushPolicy(...policiesIfNetworkIsUp)
}
});

```

### Creating your own flush policies

You can create a custom FlushPolicy special for your application needs by implementing the `FlushPolicy` interface. You can also extend the `FlushPolicyBase` class that already creates and handles the `shouldFlush` value reset.

A `FlushPolicy` only needs to implement 2 methods:
- `start()`: Executed when the flush policy is enabled and added to the client. This is a good place to start background operations, make async calls, configure things before execution
- `onEvent(event: SegmentEvent)`: Gets called on every event tracked by your client
- `reset()`: Called after a flush is triggered (either by your policy, by another policy or manually)

They also have a `shouldFlush` observable boolean value. When this is set to true the client will atempt to upload events. Each policy should reset this value to `false` according to its own logic, although it is pretty common to do it inside the `reset` method.

```ts
export class FlushOnScreenEventsPolicy extends FlushPolicyBase {

onEvent(event: SegmentEvent): void {
// Only flush when a screen even happens
if (event.type === EventType.ScreenEvent) {
this.shouldFlush.value = true;
}
}

reset(): void {
// Superclass will reset the shouldFlush value so that the next screen event triggers a flush again
// But you can also reset the value whenever, say another event comes in or after a timeout
super.reset();
}
}
```


## Contributing

Expand Down
1 change: 0 additions & 1 deletion packages/core/src/__tests__/analytics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ describe('SegmentClient', () => {
segmentClient.cleanup();
// @ts-ignore
expect(segmentClient.destroyed).toBe(true);
expect(clearInterval).toHaveBeenCalledTimes(1);
// @ts-ignore
expect(segmentClient.appStateSubscription.remove).toHaveBeenCalledTimes(
1
Expand Down
5 changes: 0 additions & 5 deletions packages/core/src/__tests__/methods/flush.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import { getMockTimeline } from '../__helpers__/mockTimeline';
import type { DestinationPlugin } from '../../plugin';
import { MockSegmentStore } from '../__helpers__/mockSegmentStore';

jest.spyOn(global, 'setTimeout');

jest.mock('react-native');
jest.mock('../../uuid');

Expand Down Expand Up @@ -66,9 +64,6 @@ describe('methods #flush', () => {

await client.flush();

// We expect the flush interval to be reset by default
expect(setTimeout).toBeCalled();

expect(mockDestinationPlugin.flush).toHaveBeenCalledTimes(1);
});

Expand Down
135 changes: 79 additions & 56 deletions packages/core/src/analytics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//@ts-ignore
import { Unsubscribe } from '@segment/sovran-react-native';
import type { Rule } from '@segment/tsub/dist/store';
import deepmerge from 'deepmerge';
import allSettled from 'promise.allsettled';
import { AppState, AppStateStatus } from 'react-native';
Expand All @@ -13,6 +13,8 @@ import {
createScreenEvent,
createTrackEvent,
} from './events';
import { CountFlushPolicy, TimerFlushPolicy } from './flushPolicies';
import { FlushPolicyExecuter } from './flushPolicies/flush-policy-executer';
import type { DestinationPlugin, PlatformPlugin, Plugin } from './plugin';
import { InjectContext } from './plugins/InjectContext';
import { InjectUserInfo } from './plugins/InjectUserInfo';
Expand All @@ -25,6 +27,7 @@ import {
Watchable,
} from './storage';
import { Timeline } from './timeline';
import type { DestinationFilters, SegmentAPISettings } from './types';
import {
Config,
Context,
Expand All @@ -42,8 +45,7 @@ import {
} from './types';
import { getPluginsWithFlush, getPluginsWithReset } from './util';
import { getUUID } from './uuid';
import type { SegmentAPISettings, DestinationFilters } from './types';
import type { Rule } from '@segment/tsub/dist/store';
import type { FlushPolicy } from './flushPolicies';

type OnContextLoadCallback = (type: UpdateType) => void | Promise<void>;
type OnPluginAddedCallback = (plugin: Plugin) => void;
Expand All @@ -64,12 +66,6 @@ export class SegmentClient {
// logger
public logger: LoggerType;

// internal time to know when to flush, ticks every second
private flushInterval: ReturnType<typeof setTimeout> | null = null;

// unsubscribe watchers for the store
private watchers: Unsubscribe[] = [];

// whether the user has called cleanup
private destroyed: boolean = false;

Expand All @@ -81,8 +77,10 @@ export class SegmentClient {

private pluginsToAdd: Plugin[] = [];

private isInitialized = false;
private flushPolicyExecuter!: FlushPolicyExecuter;

private isInitialized = false;
// TODO: Refactor into Observable<T>
private isContextLoaded = false;

private onContextLoadedObservers: OnContextLoadCallback[] = [];
Expand Down Expand Up @@ -208,7 +206,9 @@ export class SegmentClient {

// Watch for isReady so that we can handle any pending events
// Delays events processing in the timeline until the store is ready to prevent missing data injected from the plugins
this.store.isReady.onChange((value) => this.onStorageReady(value));
this.store.isReady.onChange((value) => {
this.onStorageReady(value);
});

// add segment destination plugin unless
// asked not to via configuration.
Expand All @@ -219,6 +219,9 @@ export class SegmentClient {

// Setup platform specific plugins
this.platformPlugins.forEach((plugin) => this.add({ plugin: plugin }));

// Start flush policies
this.setupFlushPolicies();
}

/**
Expand All @@ -234,11 +237,7 @@ export class SegmentClient {
await this.fetchSettings();

// flush any stored events
this.flush(false);

// set up the timer/subscription for knowing when to flush events
this.setupInterval();
this.setupStorageSubscribers();
this.flushPolicyExecuter.manualFlush();

// set up tracking for lifecycle events
this.setupLifecycleEvents();
Expand Down Expand Up @@ -292,21 +291,6 @@ export class SegmentClient {
}
}

/**
* Clears all subscriptions to the store
*/
private unsubscribeStorageWatchers() {
if (this.watchers.length > 0) {
for (const unsubscribe of this.watchers) {
try {
unsubscribe();
} catch (e) {
this.logger.error(e);
}
}
}
}

/**
* There is no garbage collection in JS, which means that any listeners, timeouts and subscriptions
* would run until the application closes
Expand All @@ -318,32 +302,13 @@ export class SegmentClient {
* it gets approved: https://github.com/tc39/proposal-weakrefs#finalizers
*/
cleanup() {
if (this.flushInterval) {
clearInterval(this.flushInterval);
}

this.unsubscribeStorageWatchers();

this.flushPolicyExecuter.cleanup();
this.appStateSubscription?.remove();

this.destroyed = true;
this.isInitialized = false;
}

private setupInterval() {
if (this.flushInterval !== null && this.flushInterval !== undefined) {
clearInterval(this.flushInterval);
}

this.flushInterval = setTimeout(() => {
this.flush();
}, this.config.flushInterval! * 1000);
}

private setupStorageSubscribers() {
this.unsubscribeStorageWatchers();
}

private setupLifecycleEvents() {
this.appStateSubscription?.remove();

Expand Down Expand Up @@ -412,6 +377,7 @@ export class SegmentClient {
async process(incomingEvent: SegmentEvent) {
const event = applyRawEventData(incomingEvent);
if (this.store.isReady.get() === true) {
this.flushPolicyExecuter.notify(event);
return this.timeline.process(event);
} else {
this.pendingEvents.push(event);
Expand Down Expand Up @@ -475,15 +441,12 @@ export class SegmentClient {
}
}

async flush(debounceInterval: boolean = true): Promise<void> {
async flush(): Promise<void> {
if (this.destroyed) {
return;
}

if (debounceInterval) {
// Reset interval
this.setupInterval();
}
this.flushPolicyExecuter.reset();

const promises: (void | Promise<void>)[] = [];
getPluginsWithFlush(this.timeline).forEach((plugin) => {
Expand Down Expand Up @@ -704,4 +667,64 @@ export class SegmentClient {
private triggerOnPluginLoaded(plugin: Plugin) {
this.onPluginAddedObservers.map((f) => f?.(plugin));
}

/**
* Initializes the flush policies from config and subscribes to updates to
* trigger flush
*/
private setupFlushPolicies() {
let flushPolicies = this.config.flushPolicies ?? [];

// Compatibility with older arguments
if (
this.config.flushAt !== undefined &&
this.config.flushAt !== null &&
this.config.flushAt > 0
) {
flushPolicies.push(new CountFlushPolicy(this.config.flushAt));
}

if (
this.config.flushInterval !== undefined &&
this.config.flushInterval !== null &&
this.config.flushInterval > 0
) {
flushPolicies.push(
new TimerFlushPolicy(this.config.flushInterval * 1000)
);
}

this.flushPolicyExecuter = new FlushPolicyExecuter(flushPolicies, () => {
this.flush();
});
}

/**
* Adds a FlushPolicy to the list
* @param policies policies to add
*/
addFlushPolicy(...policies: FlushPolicy[]) {
for (const policy of policies) {
this.flushPolicyExecuter.add(policy);
}
}

/**
* Removes a FlushPolicy from the execution
*
* @param policies policies to remove
* @returns true if the value was removed, false if not found
*/
removeFlushPolicy(...policies: FlushPolicy[]) {
for (const policy of policies) {
this.flushPolicyExecuter.remove(policy);
}
}

/**
* Returns the current enabled flush policies
*/
getFlushPolicies() {
return this.flushPolicyExecuter.policies;
}
}
Loading