Skip to content

Commit f28e510

Browse files
Review
1 parent 45d44ff commit f28e510

File tree

4 files changed

+50
-55
lines changed

4 files changed

+50
-55
lines changed

packages/firestore/src/core/event_manager.ts

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,15 @@ import { EventHandler } from '../util/misc';
2020
import { ObjectMap } from '../util/obj_map';
2121
import { Query } from './query';
2222
import { SyncEngine, SyncEngineListener } from './sync_engine';
23-
import { OnlineState, TargetId } from './types';
23+
import { OnlineState } from './types';
2424
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
25-
import { logError } from '../util/log';
26-
import { Code, FirestoreError } from '../util/error';
27-
28-
const LOG_TAG = 'EventManager';
2925

3026
/**
3127
* Holds the listeners and the last received ViewSnapshot for a query being
3228
* tracked by EventManager.
3329
*/
3430
class QueryListenersInfo {
3531
viewSnap: ViewSnapshot | undefined = undefined;
36-
targetId: TargetId = 0;
3732
listeners: QueryListener[] = [];
3833
}
3934

@@ -74,23 +69,7 @@ export class EventManager implements SyncEngineListener {
7469
}
7570

7671
if (firstListen) {
77-
try {
78-
const { targetId, snapshot } = await this.syncEngine.listen(query);
79-
queryInfo.targetId = targetId;
80-
queryInfo.viewSnap = snapshot;
81-
} catch (e) {
82-
if (e.name === 'IndexedDbTransactionError') {
83-
logError(LOG_TAG, `Failed to register query: ${e}`);
84-
listener.onError(
85-
new FirestoreError(
86-
Code.UNAVAILABLE,
87-
`Failed to register query: ${e}`
88-
)
89-
);
90-
return;
91-
}
92-
throw e;
93-
}
72+
queryInfo.viewSnap = await this.syncEngine.listen(query);
9473
}
9574

9675
this.queries.set(query, queryInfo!);

packages/firestore/src/core/firestore_client.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { Datastore } from '../remote/datastore';
2727
import { RemoteStore } from '../remote/remote_store';
2828
import { AsyncQueue } from '../util/async_queue';
2929
import { Code, FirestoreError } from '../util/error';
30-
import { logDebug } from '../util/log';
30+
import { logDebug, logError } from '../util/log';
3131
import { Deferred } from '../util/promise';
3232
import {
3333
EventManager,
@@ -398,8 +398,22 @@ export class FirestoreClient {
398398
): QueryListener {
399399
this.verifyNotTerminated();
400400
const listener = new QueryListener(query, observer, options);
401-
this.asyncQueue.enqueueAndForget(() => {
402-
return this.eventMgr.listen(listener);
401+
this.asyncQueue.enqueueAndForget(async () => {
402+
try {
403+
await this.eventMgr.listen(listener);
404+
} catch (e) {
405+
logError(LOG_TAG, `Failed to register query: ${e}`);
406+
if (e.name === 'IndexedDbTransactionError') {
407+
listener.onError(
408+
new FirestoreError(
409+
Code.UNAVAILABLE,
410+
`Failed to register query: ${e}`
411+
)
412+
);
413+
} else {
414+
throw e;
415+
}
416+
}
403417
});
404418
return listener;
405419
}

packages/firestore/src/core/sync_engine.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,6 @@ class QueryView {
102102
) {}
103103
}
104104

105-
/** Result type returned during initial query registration. */
106-
export interface QueryRegistration {
107-
targetId: TargetId;
108-
snapshot?: ViewSnapshot;
109-
}
110-
111105
/** Tracks a limbo resolution. */
112106
class LimboResolution {
113107
constructor(public key: DocumentKey) {}
@@ -220,9 +214,9 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
220214
/**
221215
* Initiates the new listen, resolves promise when listen enqueued to the
222216
* server. All the subsequent view snapshots or errors are sent to the
223-
* subscribed handlers. Returns the targetId of the query.
217+
* subscribed handlers. Returns the initial snapshot.
224218
*/
225-
async listen(query: Query): Promise<QueryRegistration> {
219+
async listen(query: Query): Promise<ViewSnapshot> {
226220
this.assertSubscribed('listen()');
227221

228222
let targetId;
@@ -256,7 +250,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
256250
}
257251
}
258252

259-
return { targetId, snapshot: viewSnapshot };
253+
return viewSnapshot;
260254
}
261255

262256
/**

packages/firestore/test/unit/specs/spec_test_runner.ts

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ import {
7070
} from '../../../src/remote/watch_change';
7171
import { debugAssert, fail } from '../../../src/util/assert';
7272
import { AsyncQueue, TimerId } from '../../../src/util/async_queue';
73-
import { FirestoreError } from '../../../src/util/error';
73+
import { Code, FirestoreError } from '../../../src/util/error';
7474
import { primitiveComparator } from '../../../src/util/misc';
7575
import { forEach, objectSize } from '../../../src/util/obj';
7676
import { ObjectMap } from '../../../src/util/obj_map';
@@ -608,28 +608,36 @@ abstract class TestRunner {
608608
const queryListener = new QueryListener(query, aggregator, options);
609609
this.queryListeners.set(query, queryListener);
610610

611-
await this.queue.enqueue(() => {
612-
return this.eventManager.listen(queryListener);
611+
const targetAdded = await this.queue.enqueue(async () => {
612+
try {
613+
await this.eventManager.listen(queryListener);
614+
return true;
615+
} catch (e) {
616+
expect(this.persistence.injectFailures).to.be.true;
617+
queryListener.onError(
618+
new FirestoreError(Code.UNAVAILABLE, e.message)
619+
);
620+
return false;
621+
}
613622
});
614623

615-
if (this.persistence.injectFailures) {
616-
// The Watch stream won't open if we are injecting failures.
617-
return;
618-
}
619-
620-
// Skip the backoff that may have been triggered by a previous call to
621-
// `watchStreamCloses()`.
622-
if (
623-
this.queue.containsDelayedOperation(TimerId.ListenStreamConnectionBackoff)
624-
) {
625-
await this.queue.runDelayedOperationsEarly(
626-
TimerId.ListenStreamConnectionBackoff
627-
);
628-
}
624+
if (targetAdded) {
625+
// Skip the backoff that may have been triggered by a previous call to
626+
// `watchStreamCloses()`.
627+
if (
628+
this.queue.containsDelayedOperation(
629+
TimerId.ListenStreamConnectionBackoff
630+
)
631+
) {
632+
await this.queue.runDelayedOperationsEarly(
633+
TimerId.ListenStreamConnectionBackoff
634+
);
635+
}
629636

630-
if (this.isPrimaryClient && this.networkEnabled) {
631-
// Open should always have happened after a listen
632-
await this.connection.waitForWatchOpen();
637+
if (this.isPrimaryClient && this.networkEnabled) {
638+
// Open should always have happened after a listen
639+
await this.connection.waitForWatchOpen();
640+
}
633641
}
634642
}
635643

0 commit comments

Comments
 (0)