Skip to content

Commit f623c0e

Browse files
Removing TestFirestore and TestQueue in favor of drain()
1 parent 4dd1c27 commit f623c0e

File tree

6 files changed

+96
-98
lines changed

6 files changed

+96
-98
lines changed

packages/firestore/src/api/database.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ export class Firestore implements firestore.Firestore, FirebaseService {
188188
// Operations on the _firestoreClient don't block on _firestoreReady. Those
189189
// are already set to synchronize on the async queue.
190190
private _firestoreClient: FirestoreClient | undefined;
191+
private _queue = new AsyncQueue();
191192
public _dataConverter: UserDataConverter;
192193

193194
constructor(databaseIdOrApp: FirestoreDatabase | FirebaseApp) {
@@ -310,7 +311,7 @@ export class Firestore implements firestore.Firestore, FirebaseService {
310311
PlatformSupport.getPlatform(),
311312
databaseInfo,
312313
this._config.credentials,
313-
this.initializeAsyncQueue()
314+
this._queue,
314315
);
315316
return this._firestoreClient.start(persistence);
316317
}
@@ -374,7 +375,8 @@ export class Firestore implements firestore.Firestore, FirebaseService {
374375
},
375376
// Exposed via INTERNAL for use in tests.
376377
disableNetwork: () => this._firestoreClient.disableNetwork(),
377-
enableNetwork: () => this._firestoreClient.enableNetwork()
378+
enableNetwork: () => this._firestoreClient.enableNetwork(),
379+
queue : this._queue,
378380
};
379381

380382
collection(pathString: string): firestore.CollectionReference {
@@ -455,11 +457,6 @@ export class Firestore implements firestore.Firestore, FirebaseService {
455457
);
456458
}
457459
}
458-
459-
/** Creates a new AsyncQueue. Can be overridden to provide a custom queue. */
460-
protected initializeAsyncQueue(): AsyncQueue {
461-
return new AsyncQueue();
462-
}
463460
}
464461

465462
/**

packages/firestore/src/util/async_queue.ts

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ import { AnyDuringMigration, AnyJs } from './misc';
2020
import { Deferred } from './promise';
2121
import { Code, FirestoreError } from './error';
2222

23+
type DelayedOperation <T> = {
24+
// tslint:disable-next-line:no-any Accept any return type from setTimeout().
25+
handle: any;
26+
op:() => Promise<T>,
27+
deferred: Deferred<T>;
28+
}
29+
2330
export class AsyncQueue {
2431
// The last promise in the queue.
2532
private tail: Promise<AnyJs | void> = Promise.resolve();
@@ -29,16 +36,16 @@ export class AsyncQueue {
2936
// has a delay that has not yet elapsed). Prior to cleanup, this list may also
3037
// contain entries that have already been run (in which case `handle` is
3138
// null).
32-
private delayedOperations: {
33-
// tslint:disable-next-line:no-any Accept any return type from setTimeout().
34-
handle: any;
35-
deferred: Deferred<AnyJs | void>;
36-
}[] = [];
39+
//
40+
// tslint:disable-next-line:no-any Accept any type of delayed operation.
41+
private delayedOperations: DelayedOperation<any>[] = [];
3742

3843
// The number of operations that are queued to be run in the future (i.e. they
3944
// have a delay that has not yet elapsed). Unlike `delayedOperations`, this
4045
// is guaranteed to only contain operations that have not yet been run.
41-
private delayedOperationsCount = 0;
46+
//
47+
// Visible for testing.
48+
delayedOperationsCount = 0;
4249

4350
// visible for testing
4451
failure: Error;
@@ -61,23 +68,27 @@ export class AsyncQueue {
6168

6269
if ((delay || 0) > 0) {
6370
this.delayedOperationsCount++;
64-
const deferred = new Deferred<T>();
65-
const opIndex = this.delayedOperations.length;
66-
const handle = setTimeout(() => {
71+
const nextIndex = this.delayedOperations.length;
72+
const delayedOp : DelayedOperation<T> = {
73+
handle: null,
74+
op: op,
75+
deferred: new Deferred<T>()
76+
};
77+
delayedOp.handle = setTimeout(() => {
6778
this.scheduleInternal(() => {
68-
return op().then(result => {
69-
deferred.resolve(result);
79+
return delayedOp.op().then(result => {
80+
delayedOp.deferred.resolve(result);
7081
});
7182
});
7283
this.delayedOperationsCount--;
7384
if (this.delayedOperationsCount > 0) {
74-
this.delayedOperations[opIndex].handle = null;
85+
delayedOp.handle = null;
7586
} else {
7687
this.delayedOperations = [];
7788
}
7889
}, delay);
79-
this.delayedOperations[opIndex] = { handle, deferred };
80-
return deferred.promise;
90+
this.delayedOperations[nextIndex] = delayedOp;
91+
return delayedOp.deferred.promise;
8192
} else {
8293
return this.scheduleInternal(op);
8394
}
@@ -114,16 +125,22 @@ export class AsyncQueue {
114125
}
115126

116127
/**
117-
* Waits for tasks that are scheduled for immediate execution and rejects any
118-
* tasks that are pending with a non-zero delay.
128+
* Waits until all currently scheduled tasks are finished executing. Tasks
129+
* schedule with a delay can be rejected or queued for immediate execution.
130+
*
131+
* @param executeDelayedTasks
119132
*/
120-
drain(): Promise<void> {
133+
drain(executeDelayedTasks: boolean): Promise<void> {
121134
this.delayedOperations.forEach(entry => {
122135
if (entry.handle) {
123136
clearTimeout(entry.handle);
124-
entry.deferred.reject(
125-
new FirestoreError(Code.CANCELLED, 'Operation cancelled by shutdown')
126-
);
137+
if (executeDelayedTasks) {
138+
this.scheduleInternal(entry.op).then(entry.deferred.resolve, entry.deferred.reject);
139+
} else {
140+
entry.deferred.reject(
141+
new FirestoreError(Code.CANCELLED, 'Operation cancelled by shutdown')
142+
);
143+
}
127144
}
128145
});
129146
this.delayedOperations = [];

packages/firestore/test/integration/api/database.test.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { expect } from 'chai';
1818
import * as firestore from 'firestore';
1919

2020
import { Deferred } from '../../../src/util/promise';
21-
import { asyncIt } from '../../util/helpers';
21+
import {asyncIt, fasyncIt} from '../../util/helpers';
2222
import firebase from '../util/firebase_export';
2323
import {
2424
apiDescribe,
@@ -629,11 +629,11 @@ apiDescribe('Database', persistence => {
629629
asyncIt('can write document after idle timeout', () => {
630630
return withTestDb(persistence, (db, queue) => {
631631
const docRef = db.collection('test-collection').doc();
632-
return queue
633-
.awaitIdleTimeout(() => {
634-
return docRef.set({ foo: 'bar' });
635-
})
636-
.then(() => docRef.set({ foo: 'bar' }));
632+
return docRef.set({ foo: 'bar' }).then(() => {
633+
console.log(JSON.stringify(queue));
634+
expect(queue.delayedOperationsCount).to.be.equal(1);
635+
return queue.drain(/* executeDelayedTasks= */ true);
636+
}).then(() => docRef.set({ foo: 'bar' }));
637637
});
638638
});
639639

@@ -653,11 +653,10 @@ apiDescribe('Database', persistence => {
653653
return deferred.promise.then(unregister);
654654
};
655655

656-
return queue
657-
.awaitIdleTimeout(() => {
658-
return awaitOnlineSnapshot();
659-
})
660-
.then(() => awaitOnlineSnapshot());
656+
return awaitOnlineSnapshot().then(() => {
657+
expect(queue.delayedOperationsCount).to.be.equal(1);
658+
return queue.drain(/* executeDelayedTasks= */ true);
659+
}).then(() => awaitOnlineSnapshot());
661660
});
662661
});
663662
});

packages/firestore/test/integration/remote/stream.test.ts

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import {
3030
} from '../../../src/remote/watch_change';
3131
import { AsyncQueue } from '../../../src/util/async_queue';
3232
import { Deferred } from '../../../src/util/promise';
33-
import { asyncIt, setMutation } from '../../util/helpers';
33+
import { asyncIt, fasyncIt, setMutation } from '../../util/helpers';
3434
import { withTestDatastore } from '../util/helpers';
3535
import { TestQueue } from '../util/test_queue';
3636

@@ -214,10 +214,10 @@ describe('Write Stream', () => {
214214

215215
asyncIt('closes when idle', () => {
216216
let writeStream: PersistentWriteStream;
217-
let queue: TestQueue;
217+
let queue: AsyncQueue;
218218

219-
return withTestDatastore((ds, testQueue) => {
220-
queue = testQueue;
219+
return withTestDatastore((ds, q) => {
220+
queue = q;
221221
writeStream = ds.newPersistentWriteStream();
222222
writeStream.start(streamListener);
223223
return streamListener.awaitCallback('open');
@@ -226,24 +226,25 @@ describe('Write Stream', () => {
226226
writeStream.writeHandshake();
227227
return streamListener.awaitCallback('handshakeComplete');
228228
})
229-
.then(() =>
230-
queue
231-
.awaitIdleTimeout(() => {
232-
writeStream.markIdle();
233-
return streamListener.awaitCallback('close');
234-
})
235-
.then(() => {
236-
expect(writeStream.isOpen()).to.be.false;
237-
})
238-
);
229+
.then(() => {
230+
writeStream.markIdle();
231+
expect(queue.delayedOperationsCount).to.be.equal(1);
232+
return Promise.all([
233+
streamListener.awaitCallback('close'),
234+
queue.drain(/*executeDelayedTasks=*/ true)
235+
]);
236+
})
237+
.then(() => {
238+
expect(writeStream.isOpen()).to.be.false;
239+
});
239240
});
240241

241242
asyncIt('cancels idle on write', () => {
242243
let writeStream: PersistentWriteStream;
243-
let queue: TestQueue;
244+
let queue: AsyncQueue;
244245

245-
return withTestDatastore((ds, testQueue) => {
246-
queue = testQueue;
246+
return withTestDatastore((ds, q) => {
247+
queue = q;
247248
writeStream = ds.newPersistentWriteStream();
248249
writeStream.start(streamListener);
249250
return streamListener.awaitCallback('open');
@@ -252,17 +253,15 @@ describe('Write Stream', () => {
252253
writeStream.writeHandshake();
253254
return streamListener.awaitCallback('handshakeComplete');
254255
})
255-
.then(() =>
256-
queue
257-
.awaitIdleTimeout(() => {
258-
// Mark the stream idle, but immediately cancel the idle timer by issuing another write.
259-
writeStream.markIdle();
260-
writeStream.writeMutations(SINGLE_MUTATION);
261-
return streamListener.awaitCallback('mutationResult');
262-
})
263-
.then(() => {
264-
expect(writeStream.isOpen()).to.be.true;
265-
})
266-
);
256+
.then(() => {
257+
// Mark the stream idle, but immediately cancel the idle timer by issuing another write.
258+
writeStream.markIdle();
259+
expect(queue.delayedOperationsCount).to.be.equal(1);
260+
writeStream.writeMutations(SINGLE_MUTATION);
261+
return streamListener.awaitCallback('mutationResult');
262+
}).then(() => queue.drain(/*executeDelayedTasks=*/ true))
263+
.then(() => {
264+
expect(writeStream.isOpen()).to.be.true;
265+
});
267266
});
268267
});

packages/firestore/test/integration/util/helpers.ts

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,13 @@
1616

1717
import * as firestore from 'firestore';
1818

19-
import { FirebaseApp } from '@firebase/app';
20-
2119
import { DatabaseId, DatabaseInfo } from '../../../src/core/database_info';
2220
import { Datastore } from '../../../src/remote/datastore';
2321

2422
import firebase from './firebase_export';
2523
import { EmptyCredentialsProvider } from '../../../src/api/credentials';
2624
import { PlatformSupport } from '../../../src/platform/platform';
2725
import { AsyncQueue } from '../../../src/util/async_queue';
28-
import { Firestore, FirestoreDatabase } from '../../../src/api/database';
29-
import { TestQueue } from './test_queue';
3026

3127
// tslint:disable-next-line:no-any __karma__ is an untyped global
3228
declare const __karma__: any;
@@ -59,23 +55,6 @@ function isIeOrEdge(): boolean {
5955
);
6056
}
6157

62-
/** Implementation of Firestore that offers access to the underlying queue. */
63-
class TestFirestore extends Firestore {
64-
_queue = new TestQueue();
65-
66-
constructor(databaseIdOrApp: FirestoreDatabase | FirebaseApp) {
67-
super(databaseIdOrApp);
68-
}
69-
70-
get queue() {
71-
return this._queue;
72-
}
73-
74-
protected initializeAsyncQueue(): AsyncQueue {
75-
return this._queue;
76-
}
77-
}
78-
7958
export function isPersistenceAvailable(): boolean {
8059
return !isIeOrEdge();
8160
}
@@ -108,10 +87,10 @@ export function getDefaultDatabaseInfo(): DatabaseInfo {
10887
}
10988

11089
export function withTestDatastore(
111-
fn: (datastore: Datastore, queue: TestQueue) => Promise<void>
90+
fn: (datastore: Datastore, queue: AsyncQueue) => Promise<void>
11291
): Promise<void> {
11392
const databaseInfo = getDefaultDatabaseInfo();
114-
const queue = new TestQueue();
93+
const queue = new AsyncQueue();
11594
return PlatformSupport.getPlatform()
11695
.loadConnection(databaseInfo)
11796
.then(conn => {
@@ -132,25 +111,29 @@ export function withTestDatastore(
132111

133112
export function withTestDb(
134113
persistence: boolean,
135-
fn: (db: firestore.Firestore, queue: TestQueue) => Promise<void>
114+
fn: (db: firestore.Firestore, queue: AsyncQueue) => Promise<void>
136115
): Promise<void> {
137116
return withTestDbs(persistence, 1, ([db]) => {
138-
return fn(db, (db as TestFirestore).queue);
117+
// tslint:disable-next-line:no-any queue isn't exposed via d.ts
118+
const firestoreInternal = db.INTERNAL as any;
119+
return fn(db, firestoreInternal.queue);
139120
});
140121
}
141122

142123
/** Runs provided fn with a db for an alternate project id. */
143124
export function withAlternateTestDb(
144125
persistence: boolean,
145-
fn: (db: firestore.Firestore, queue: TestQueue) => Promise<void>
126+
fn: (db: firestore.Firestore, queue: AsyncQueue) => Promise<void>
146127
): Promise<void> {
147128
return withTestDbsSettings(
148129
persistence,
149130
ALT_PROJECT_ID,
150131
DEFAULT_SETTINGS,
151132
1,
152133
([db]) => {
153-
return fn(db, (db as TestFirestore).queue);
134+
// tslint:disable-next-line:no-any queue isn't exposed via d.ts
135+
const firestoreInternal = db.INTERNAL as any;
136+
return fn(db, firestoreInternal.queue);
154137
}
155138
);
156139
}
@@ -189,10 +172,13 @@ export function withTestDbsSettings(
189172
{ apiKey: 'fake-api-key', projectId },
190173
'test-app-' + appCount++
191174
);
192-
const firestore = new TestFirestore(app);
175+
176+
// tslint:disable-next-line:no-any Firestore is not exposed in firebase.d.ts
177+
const firebaseAny = firebase as any;
178+
const firestore = firebaseAny.firestore(app);
193179
firestore.settings(settings);
194180

195-
let ready: Promise<TestFirestore>;
181+
let ready: Promise<firestore.Firestore>;
196182
if (persistence) {
197183
ready = firestore.enablePersistence().then(() => firestore);
198184
} else {
@@ -202,7 +188,7 @@ export function withTestDbsSettings(
202188
promises.push(ready);
203189
}
204190

205-
return Promise.all(promises).then((dbs: TestFirestore[]) => {
191+
return Promise.all(promises).then((dbs: firestore.Firestore[]) => {
206192
return fn(dbs)
207193
.then(wipeDb.bind(null, dbs[0]), error => {
208194
return wipeDb(dbs[0]).then(() => {

packages/firestore/test/unit/specs/spec_test_runner.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ abstract class TestRunner {
422422
console.log('Running spec: ' + this.name);
423423
return sequence(steps, async step => {
424424
await this.doStep(step);
425-
await this.queue.drain();
425+
await this.queue.drain(/* executeDelayedTasks */ false);
426426
this.validateStepExpectations(step.expect!);
427427
this.validateStateExpectations(step.stateExpect!);
428428
this.eventList = [];

0 commit comments

Comments
 (0)