Skip to content

Tree-Shake EventManager #3640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
aa4fc7f
Make View processing logic optional
schmidt-sebastian Aug 5, 2020
8350a84
Merge branch 'master' into mrschmidt/optionalviews
schmidt-sebastian Aug 5, 2020
dd872af
Merge branch 'master' into mrschmidt/optionalviews
schmidt-sebastian Aug 11, 2020
e2be013
Review
schmidt-sebastian Aug 11, 2020
eafa8e9
Make all of SyncEngine tree-shakeable
schmidt-sebastian Aug 14, 2020
03b1ce4
Re-run gendeps
schmidt-sebastian Aug 14, 2020
68f565b
Merge branch 'master' into mrschmidt/optionalviews
schmidt-sebastian Aug 14, 2020
4eccec3
Prettier
schmidt-sebastian Aug 14, 2020
f3efd44
Cleanup
schmidt-sebastian Aug 14, 2020
88a5bc0
Comment
schmidt-sebastian Aug 14, 2020
9f97606
Tree-shake all of LocalStore
schmidt-sebastian Aug 14, 2020
4b5105c
Comments
schmidt-sebastian Aug 14, 2020
9b018db
Merge
schmidt-sebastian Aug 14, 2020
c7d62f4
Merge
schmidt-sebastian Aug 15, 2020
08e425b
Undo EventManager's signature changes, use callbacks
schmidt-sebastian Aug 17, 2020
320d79a
Documentation
schmidt-sebastian Aug 17, 2020
35c8350
Tree-Shake EventManager
schmidt-sebastian Aug 17, 2020
a869f8f
Fix test
schmidt-sebastian Aug 17, 2020
77b4aec
Merge
schmidt-sebastian Aug 17, 2020
d7b5d2a
Lint
schmidt-sebastian Aug 17, 2020
e3239c5
Merge branch 'mrschmidt/optionalviews' into mrschmidt/eventmanager
schmidt-sebastian Aug 17, 2020
3dc51cd
Update event_manager.test.ts
schmidt-sebastian Aug 17, 2020
74ab415
Review
schmidt-sebastian Aug 21, 2020
452e460
Update tests
schmidt-sebastian Aug 21, 2020
1b2d3eb
Review/Fix build
schmidt-sebastian Aug 26, 2020
49bde61
More test fixes
schmidt-sebastian Aug 26, 2020
2674016
Undo async
schmidt-sebastian Aug 27, 2020
7f52b3c
Merge
schmidt-sebastian Aug 27, 2020
84bb297
Merge
schmidt-sebastian Aug 27, 2020
fe721ad
Merge
schmidt-sebastian Aug 27, 2020
44223f9
Merge branch 'mrschmidt/eventmanager' of github.com:firebase/firebase…
schmidt-sebastian Aug 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions packages/firestore/src/core/component_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ import {
SyncEngine
} from './sync_engine';
import { RemoteStore } from '../remote/remote_store';
import { EventManager } from './event_manager';
import {
EventManager,
newEventManager,
eventManagerOnOnlineStateChange,
eventManagerOnWatchChange,
eventManagerOnWatchError
} from './event_manager';
import { AsyncQueue } from '../util/async_queue';
import { DatabaseId, DatabaseInfo } from './database_info';
import { Datastore, newDatastore } from '../remote/datastore';
Expand Down Expand Up @@ -334,7 +340,14 @@ export class OnlineComponentProvider {
this.syncEngine = this.createSyncEngine(cfg);
this.eventManager = this.createEventManager(cfg);

this.syncEngine.subscribe(this.eventManager);
this.syncEngine.subscribe({
onWatchChange: eventManagerOnWatchChange.bind(null, this.eventManager),
onWatchError: eventManagerOnWatchError.bind(null, this.eventManager),
onOnlineStateChange: eventManagerOnOnlineStateChange.bind(
null,
this.eventManager
)
});

this.sharedClientState.onlineStateHandler = onlineState =>
applyOnlineStateChange(
Expand All @@ -353,7 +366,7 @@ export class OnlineComponentProvider {
}

createEventManager(cfg: ComponentConfiguration): EventManager {
return new EventManager();
return newEventManager();
}

createDatastore(cfg: ComponentConfiguration): Datastore {
Expand Down
257 changes: 151 additions & 106 deletions packages/firestore/src/core/event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* limitations under the License.
*/

import { debugAssert } from '../util/assert';
import { debugAssert, debugCast } from '../util/assert';
import { EventHandler } from '../util/misc';
import { ObjectMap } from '../util/obj_map';
import { canonifyQuery, Query, queryEquals, stringifyQuery } from './query';
import { SyncEngineListener } from './sync_engine';
import { OnlineState } from './types';
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
import { wrapInUserErrorIfRecoverable } from '../util/async_queue';
Expand Down Expand Up @@ -50,150 +49,196 @@ export interface Observer<T> {
* assigned to SyncEngine's `listen()` and `unlisten()` API before usage. This
* allows users to tree-shake the Watch logic.
*/
export class EventManager implements SyncEngineListener {
private queries = new ObjectMap<Query, QueryListenersInfo>(
export interface EventManager {
onListen?: (query: Query) => Promise<ViewSnapshot>;
onUnlisten?: (query: Query) => Promise<void>;
}

export function newEventManager(): EventManager {
return new EventManagerImpl();
}

export class EventManagerImpl implements EventManager {
queries = new ObjectMap<Query, QueryListenersInfo>(
q => canonifyQuery(q),
queryEquals
);

private onlineState = OnlineState.Unknown;
onlineState = OnlineState.Unknown;

private snapshotsInSyncListeners: Set<Observer<void>> = new Set();
snapshotsInSyncListeners: Set<Observer<void>> = new Set();

/** Callback invoked when a Query is first listen to. */
onListen?: (query: Query) => Promise<ViewSnapshot>;
/** Callback invoked once all listeners to a Query are removed. */
onUnlisten?: (query: Query) => Promise<void>;
}

async listen(listener: QueryListener): Promise<void> {
debugAssert(!!this.onListen, 'onListen not set');
const query = listener.query;
let firstListen = false;

let queryInfo = this.queries.get(query);
if (!queryInfo) {
firstListen = true;
queryInfo = new QueryListenersInfo();
}
export async function eventManagerListen(
eventManager: EventManager,
listener: QueryListener
): Promise<void> {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);

if (firstListen) {
try {
queryInfo.viewSnap = await this.onListen(query);
} catch (e) {
const firestoreError = wrapInUserErrorIfRecoverable(
e,
`Initialization of query '${stringifyQuery(listener.query)}' failed`
);
listener.onError(firestoreError);
return;
}
}
debugAssert(!!eventManagerImpl.onListen, 'onListen not set');
const query = listener.query;
let firstListen = false;

this.queries.set(query, queryInfo);
queryInfo.listeners.push(listener);

// Run global snapshot listeners if a consistent snapshot has been emitted.
const raisedEvent = listener.applyOnlineStateChange(this.onlineState);
debugAssert(
!raisedEvent,
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
);
let queryInfo = eventManagerImpl.queries.get(query);
if (!queryInfo) {
firstListen = true;
queryInfo = new QueryListenersInfo();
}

if (queryInfo.viewSnap) {
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
if (raisedEvent) {
this.raiseSnapshotsInSyncEvent();
}
if (firstListen) {
try {
queryInfo.viewSnap = await eventManagerImpl.onListen(query);
} catch (e) {
const firestoreError = wrapInUserErrorIfRecoverable(
e,
`Initialization of query '${stringifyQuery(listener.query)}' failed`
);
listener.onError(firestoreError);
return;
}
}

async unlisten(listener: QueryListener): Promise<void> {
debugAssert(!!this.onUnlisten, 'onUnlisten not set');
const query = listener.query;
let lastListen = false;

const queryInfo = this.queries.get(query);
if (queryInfo) {
const i = queryInfo.listeners.indexOf(listener);
if (i >= 0) {
queryInfo.listeners.splice(i, 1);
lastListen = queryInfo.listeners.length === 0;
}
}
eventManagerImpl.queries.set(query, queryInfo);
queryInfo.listeners.push(listener);

if (lastListen) {
this.queries.delete(query);
return this.onUnlisten(query);
}
}
// Run global snapshot listeners if a consistent snapshot has been emitted.
const raisedEvent = listener.applyOnlineStateChange(
eventManagerImpl.onlineState
);
debugAssert(
!raisedEvent,
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
);

onWatchChange(viewSnaps: ViewSnapshot[]): void {
let raisedEvent = false;
for (const viewSnap of viewSnaps) {
const query = viewSnap.query;
const queryInfo = this.queries.get(query);
if (queryInfo) {
for (const listener of queryInfo.listeners) {
if (listener.onViewSnapshot(viewSnap)) {
raisedEvent = true;
}
}
queryInfo.viewSnap = viewSnap;
}
}
if (queryInfo.viewSnap) {
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
if (raisedEvent) {
this.raiseSnapshotsInSyncEvent();
raiseSnapshotsInSyncEvent(eventManagerImpl);
}
}
}

onWatchError(query: Query, error: Error): void {
const queryInfo = this.queries.get(query);
if (queryInfo) {
for (const listener of queryInfo.listeners) {
listener.onError(error);
}
export async function eventManagerUnlisten(
eventManager: EventManager,
listener: QueryListener
): Promise<void> {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);

debugAssert(!!eventManagerImpl.onUnlisten, 'onUnlisten not set');
const query = listener.query;
let lastListen = false;

const queryInfo = eventManagerImpl.queries.get(query);
if (queryInfo) {
const i = queryInfo.listeners.indexOf(listener);
if (i >= 0) {
queryInfo.listeners.splice(i, 1);
lastListen = queryInfo.listeners.length === 0;
}
}

// Remove all listeners. NOTE: We don't need to call syncEngine.unlisten()
// after an error.
this.queries.delete(query);
if (lastListen) {
eventManagerImpl.queries.delete(query);
return eventManagerImpl.onUnlisten(query);
}
}

onOnlineStateChange(onlineState: OnlineState): void {
this.onlineState = onlineState;
let raisedEvent = false;
this.queries.forEach((_, queryInfo) => {
export function eventManagerOnWatchChange(
eventManager: EventManager,
viewSnaps: ViewSnapshot[]
): void {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);

let raisedEvent = false;
for (const viewSnap of viewSnaps) {
const query = viewSnap.query;
const queryInfo = eventManagerImpl.queries.get(query);
if (queryInfo) {
for (const listener of queryInfo.listeners) {
// Run global snapshot listeners if a consistent snapshot has been emitted.
if (listener.applyOnlineStateChange(onlineState)) {
if (listener.onViewSnapshot(viewSnap)) {
raisedEvent = true;
}
}
});
if (raisedEvent) {
this.raiseSnapshotsInSyncEvent();
queryInfo.viewSnap = viewSnap;
}
}

addSnapshotsInSyncListener(observer: Observer<void>): void {
this.snapshotsInSyncListeners.add(observer);
// Immediately fire an initial event, indicating all existing listeners
// are in-sync.
observer.next();
if (raisedEvent) {
raiseSnapshotsInSyncEvent(eventManagerImpl);
}
}

removeSnapshotsInSyncListener(observer: Observer<void>): void {
this.snapshotsInSyncListeners.delete(observer);
export function eventManagerOnWatchError(
eventManager: EventManager,
query: Query,
error: Error
): void {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);

const queryInfo = eventManagerImpl.queries.get(query);
if (queryInfo) {
for (const listener of queryInfo.listeners) {
listener.onError(error);
}
}

// Call all global snapshot listeners that have been set.
private raiseSnapshotsInSyncEvent(): void {
this.snapshotsInSyncListeners.forEach(observer => {
observer.next();
});
// Remove all listeners. NOTE: We don't need to call syncEngine.unlisten()
// after an error.
eventManagerImpl.queries.delete(query);
}

export function eventManagerOnOnlineStateChange(
eventManager: EventManager,
onlineState: OnlineState
): void {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);

eventManagerImpl.onlineState = onlineState;
let raisedEvent = false;
eventManagerImpl.queries.forEach((_, queryInfo) => {
for (const listener of queryInfo.listeners) {
// Run global snapshot listeners if a consistent snapshot has been emitted.
if (listener.applyOnlineStateChange(onlineState)) {
raisedEvent = true;
}
}
});
if (raisedEvent) {
raiseSnapshotsInSyncEvent(eventManagerImpl);
}
}

export function addSnapshotsInSyncListener(
eventManager: EventManager,
observer: Observer<void>
): void {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);

eventManagerImpl.snapshotsInSyncListeners.add(observer);
// Immediately fire an initial event, indicating all existing listeners
// are in-sync.
observer.next();
}

export function removeSnapshotsInSyncListener(
eventManager: EventManager,
observer: Observer<void>
): void {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
eventManagerImpl.snapshotsInSyncListeners.delete(observer);
}

// Call all global snapshot listeners that have been set.
function raiseSnapshotsInSyncEvent(eventManagerImpl: EventManagerImpl): void {
eventManagerImpl.snapshotsInSyncListeners.forEach(observer => {
observer.next();
});
}

export interface ListenOptions {
/** Raise events even when only the metadata changes */
readonly includeMetadataChanges?: boolean;
Expand Down
Loading