Skip to content

Commit b86b756

Browse files
Merging PersistentStream refactor
1 parent a173ba2 commit b86b756

File tree

4 files changed

+55
-80
lines changed

4 files changed

+55
-80
lines changed

packages/firestore/src/remote/datastore.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { WatchStreamListener, WriteStreamListener } from './persistent_stream';
21
/**
32
* Copyright 2017 Google Inc.
43
*
@@ -24,7 +23,7 @@ import { Mutation, MutationResult } from '../model/mutation';
2423
import { assert } from '../util/assert';
2524
import { Code, FirestoreError } from '../util/error';
2625
import { AsyncQueue } from '../util/async_queue';
27-
26+
import { WatchStreamListener, WriteStreamListener } from './persistent_stream';
2827
import { Connection } from './connection';
2928
import {
3029
PersistentListenStream,

packages/firestore/src/remote/remote_store.ts

Lines changed: 29 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@ const MAX_PENDING_WRITES = 10;
7676
* - pulling pending mutations from LocalStore and sending them to Datastore.
7777
* - retrying mutations that failed because of network problems.
7878
* - acking mutations to the SyncEngine once they are accepted or rejected.
79-
*
80-
* RemoteStore always starts out offline. A call to `enableNetwork()`
81-
* initializes the network connection.
8279
*/
8380
export class RemoteStore implements TargetMetadataProvider {
8481
/**
@@ -111,19 +108,25 @@ export class RemoteStore implements TargetMetadataProvider {
111108
*/
112109
private listenTargets: { [targetId: number]: QueryData } = {};
113110

114-
private networkEnabled = false;
115-
116111
private watchStream: PersistentListenStream;
117112
private writeStream: PersistentWriteStream;
118113
private watchChangeAggregator: WatchChangeAggregator = null;
119114

120115
/**
121116
* Set to true by enableNetwork() and false by disableNetwork() and indicates
122-
* the user-preferred network state. A network connection is only established
123-
* if `networkAllowed` is true, the client is primary and there are
124-
* outstanding mutations or active listens.
117+
* the user-preferred network state.
118+
*/
119+
private networkEnabled = true;
120+
121+
/**
122+
* Indicates whether the network can accept requests (as determined by both
123+
* the `isPrimary` flag and the user specified `networkEnabled` flag). If
124+
* the connection is already established, new requests will be send over the
125+
* existing stream. If the stream has not yet been established, it will be
126+
* established if there are outstanding requests.
125127
*/
126-
private networkAllowed = true;
128+
private canUseNetwork = false;
129+
127130
private isPrimary = false;
128131

129132
private onlineStateTracker: OnlineStateTracker;
@@ -165,60 +168,28 @@ export class RemoteStore implements TargetMetadataProvider {
165168
* Starts up the remote store, creating streams, restoring state from
166169
* LocalStore, etc.
167170
*/
168-
<<<<<<< HEAD
169171
start(): Promise<void> {
170172
// Start is a no-op for RemoteStore.
171173
return Promise.resolve();
172174
}
173175

174-
private isNetworkEnabled(): boolean {
175-
assert(
176-
(this.watchStream == null) === (this.writeStream == null),
177-
'WatchStream and WriteStream should both be null or non-null'
178-
);
179-
return this.watchStream != null;
180-
=======
181-
async start(): Promise<void> {
182-
await this.enableNetwork();
183-
>>>>>>> master
184-
}
185-
186176
/** Re-enables the network. Idempotent. */
187177
async enableNetwork(): Promise<void> {
188-
<<<<<<< HEAD
189-
this.networkAllowed = true;
178+
this.networkEnabled = true;
190179

191-
if (this.isPrimary) {
192-
if (this.isNetworkEnabled()) {
193-
return;
194-
}
180+
if (this.isPrimary && !this.canUseNetwork) {
181+
this.canUseNetwork = true;
195182

196-
// Create new streams (but note they're not started yet).
197-
this.watchStream = this.datastore.newPersistentWatchStream();
198-
this.writeStream = this.datastore.newPersistentWriteStream();
199-
=======
200-
if (!this.networkEnabled) {
201-
this.networkEnabled = true;
202183
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();
203-
>>>>>>> master
204-
205-
// Load any saved stream token from persistent storage
206-
return this.localStore.getLastStreamToken().then(token => {
207-
this.writeStream.lastStreamToken = token;
208184

209-
<<<<<<< HEAD
210-
if (this.shouldStartWatchStream()) {
211-
this.startWatchStream();
212-
} else {
213-
this.onlineStateTracker.set(OnlineState.Unknown);
214-
}
185+
if (this.shouldStartWatchStream()) {
186+
this.startWatchStream();
187+
} else {
188+
this.onlineStateTracker.set(OnlineState.Unknown);
189+
}
215190

216-
return this.fillWritePipeline(); // This may start the writeStream.
217-
});
218-
=======
219191
// This will start the write stream if necessary.
220192
await this.fillWritePipeline();
221-
>>>>>>> master
222193
}
223194
}
224195

@@ -227,21 +198,16 @@ export class RemoteStore implements TargetMetadataProvider {
227198
* enableNetwork().
228199
*/
229200
async disableNetwork(): Promise<void> {
230-
<<<<<<< HEAD
231-
this.networkAllowed = false;
232-
201+
this.networkEnabled = false;
233202
this.disableNetworkInternal();
234-
=======
235-
await this.disableNetworkInternal();
236203

237-
>>>>>>> master
238204
// Set the OnlineState to Offline so get()s return from cache, etc.
239205
this.onlineStateTracker.set(OnlineState.Offline);
240206
}
241207

242-
private async disableNetworkInternal(): Promise<void> {
243-
if (this.networkEnabled) {
244-
this.networkEnabled = false;
208+
private disableNetworkInternal(): void {
209+
if (this.canUseNetwork) {
210+
this.canUseNetwork = false;
245211

246212
this.writeStream.stop();
247213
this.watchStream.stop();
@@ -348,18 +314,12 @@ export class RemoteStore implements TargetMetadataProvider {
348314
*/
349315
private shouldStartWatchStream(): boolean {
350316
return (
351-
this.canUseNetwork() &&
317+
this.canUseNetwork &&
352318
!this.watchStream.isStarted() &&
353319
!objUtils.isEmpty(this.listenTargets)
354320
);
355321
}
356322

357-
private canUseNetwork(): boolean {
358-
// TODO(mikelehen): This could take into account isPrimary when we merge
359-
// with multitab.
360-
return this.networkEnabled;
361-
}
362-
363323
private cleanUpWatchStreamState(): void {
364324
this.watchChangeAggregator = null;
365325
}
@@ -555,7 +515,7 @@ export class RemoteStore implements TargetMetadataProvider {
555515
*/
556516
private canAddToWritePipeline(): boolean {
557517
return (
558-
this.networkEnabled && this.writePipeline.length < MAX_PENDING_WRITES
518+
this.canUseNetwork && this.writePipeline.length < MAX_PENDING_WRITES
559519
);
560520
}
561521

@@ -582,7 +542,7 @@ export class RemoteStore implements TargetMetadataProvider {
582542

583543
private shouldStartWriteStream(): boolean {
584544
return (
585-
this.canUseNetwork() &&
545+
this.canUseNetwork &&
586546
!this.writeStream.isStarted() &&
587547
this.writePipeline.length > 0
588548
);
@@ -755,9 +715,9 @@ export class RemoteStore implements TargetMetadataProvider {
755715
async applyPrimaryState(isPrimary: boolean): Promise<void> {
756716
this.isPrimary = isPrimary;
757717

758-
if (isPrimary && this.networkAllowed) {
718+
if (isPrimary && this.networkEnabled) {
759719
await this.enableNetwork();
760-
} else if (!isPrimary && this.isNetworkEnabled()) {
720+
} else if (!isPrimary) {
761721
this.disableNetworkInternal();
762722
this.onlineStateTracker.set(OnlineState.Unknown);
763723
}

packages/firestore/test/unit/specs/spec_test_runner.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -494,17 +494,11 @@ abstract class TestRunner {
494494
}
495495

496496
async shutdown(): Promise<void> {
497-
<<<<<<< HEAD
498-
if (this.started) {
499-
await this.doShutdown();
500-
}
501-
=======
502497
await this.queue.enqueue(async () => {
503-
await this.remoteStore.shutdown();
504-
await this.persistence.shutdown(/* deleteData= */ true);
505-
await this.destroyPersistence();
498+
if (this.started) {
499+
await this.doShutdown();
500+
}
506501
});
507-
>>>>>>> master
508502
}
509503

510504
/** Runs a single SpecStep on this runner. */

packages/firestore/test/unit/specs/write_spec.test.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,4 +1245,26 @@ describeSpec('Writes:', [], () => {
12451245
);
12461246
}
12471247
);
1248+
1249+
specTest('Mutation are not sent twice after primary failover', ['multi-client'], () => {
1250+
const query = Query.atPath(path('collection'));
1251+
const docA = doc('collection/a', 0, { k: 'a' });
1252+
const docB = doc('collection/b', 0, { k: 'b' });
1253+
1254+
return client(0)
1255+
.expectPrimaryState(true)
1256+
.userSets('collection/a', { k: 'a' })
1257+
.userSets('collection/b', { k: 'b' })
1258+
.client(1)
1259+
.stealPrimaryLease()
1260+
.writeAcks('collection/a', 1000, { expectUserCallback: false })
1261+
.client(0)
1262+
.expectUserCallbacks({
1263+
acknowledged: ['collection/a']
1264+
})
1265+
.stealPrimaryLease()
1266+
.writeAcks('collection/b', 2000)
1267+
.userListens(query)
1268+
.expectEvents(query, { added: [docA, docB], fromCache:true} )
1269+
});
12481270
});

0 commit comments

Comments
 (0)