Skip to content

Commit e1dceac

Browse files
Creating Stream Tests for the Web Client (#260)
1 parent 0f888d1 commit e1dceac

File tree

3 files changed

+244
-28
lines changed

3 files changed

+244
-28
lines changed

packages/firestore/test/integration/remote/remote.test.ts

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,46 +15,22 @@
1515
*/
1616

1717
import { expect } from 'chai';
18-
import { EmptyCredentialsProvider } from '../../../src/api/credentials';
19-
import { DatabaseId, DatabaseInfo } from '../../../src/core/database_info';
2018
import { SnapshotVersion } from '../../../src/core/snapshot_version';
2119
import {
2220
Document,
2321
MaybeDocument,
2422
NoDocument
2523
} from '../../../src/model/document';
2624
import { MutationResult } from '../../../src/model/mutation';
27-
import { PlatformSupport } from '../../../src/platform/platform';
28-
import { Datastore } from '../../../src/remote/datastore';
29-
import { AsyncQueue } from '../../../src/util/async_queue';
3025
import { addEqualityMatcher } from '../../util/equality_matcher';
3126
import { asyncIt, key, setMutation } from '../../util/helpers';
32-
import { DEFAULT_PROJECT_ID, getDefaultDatabaseInfo } from '../util/helpers';
27+
import { withTestDatastore } from '../util/helpers';
3328

3429
describe('Remote Storage', () => {
3530
addEqualityMatcher();
3631

37-
function initializeDatastore(): Promise<Datastore> {
38-
const databaseInfo = getDefaultDatabaseInfo();
39-
const queue = new AsyncQueue();
40-
return PlatformSupport.getPlatform()
41-
.loadConnection(databaseInfo)
42-
.then(conn => {
43-
const serializer = PlatformSupport.getPlatform().newSerializer(
44-
databaseInfo.databaseId
45-
);
46-
return new Datastore(
47-
databaseInfo,
48-
queue,
49-
conn,
50-
new EmptyCredentialsProvider(),
51-
serializer
52-
);
53-
});
54-
}
55-
5632
asyncIt('can write', () => {
57-
return initializeDatastore().then(ds => {
33+
return withTestDatastore(ds => {
5834
const mutation = setMutation('docs/1', { sort: 1 });
5935

6036
return ds.commit([mutation]).then((result: MutationResult[]) => {
@@ -67,7 +43,7 @@ describe('Remote Storage', () => {
6743
});
6844

6945
asyncIt('can read', () => {
70-
return initializeDatastore().then(ds => {
46+
return withTestDatastore(ds => {
7147
const k = key('docs/1');
7248
const mutation = setMutation('docs/1', { sort: 10 });
7349

@@ -93,7 +69,7 @@ describe('Remote Storage', () => {
9369
});
9470

9571
asyncIt('can read deleted documents', () => {
96-
return initializeDatastore().then(ds => {
72+
return withTestDatastore(ds => {
9773
const k = key('docs/2');
9874
return ds.lookup([k]).then((docs: MaybeDocument[]) => {
9975
expect(docs.length).to.equal(1);
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/**
2+
* Copyright 2017 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { expect } from 'chai';
18+
import { SnapshotVersion } from '../../../src/core/snapshot_version';
19+
import { MutationResult } from '../../../src/model/mutation';
20+
import {
21+
PersistentListenStream,
22+
PersistentWriteStream,
23+
WatchStreamListener,
24+
WriteStreamListener
25+
} from '../../../src/remote/persistent_stream';
26+
import {
27+
DocumentWatchChange,
28+
ExistenceFilterChange,
29+
WatchTargetChange
30+
} from '../../../src/remote/watch_change';
31+
import { AsyncQueue } from '../../../src/util/async_queue';
32+
import { Deferred } from '../../../src/util/promise';
33+
import { asyncIt, setMutation } from '../../util/helpers';
34+
import { withTestDatastore } from '../util/helpers';
35+
36+
/**
37+
* StreamEventType combines the events that can be observed by the
38+
* WatchStreamListener and WriteStreamListener.
39+
*/
40+
type StreamEventType =
41+
| 'handshakeComplete'
42+
| 'mutationResult'
43+
| 'watchChange'
44+
| 'open'
45+
| 'close';
46+
47+
class StreamStatusListener implements WatchStreamListener, WriteStreamListener {
48+
private pendingCallbacks: StreamEventType[] = [];
49+
private pendingPromises: Deferred<StreamEventType>[] = [];
50+
51+
/**
52+
* Returns a Promise that resolves when the next callback fires. Resolves the
53+
* returned Promise immediately if there is already an unprocessed callback.
54+
*
55+
* This method asserts that the observed callback type matches
56+
* `expectedCallback`.
57+
*/
58+
awaitCallback(expectedCallback: StreamEventType): Promise<void> {
59+
let promise: Promise<StreamEventType>;
60+
61+
if (this.pendingCallbacks.length > 0) {
62+
let pendingCallback = this.pendingCallbacks.shift();
63+
promise = Promise.resolve(pendingCallback);
64+
} else {
65+
const deferred = new Deferred<StreamEventType>();
66+
this.pendingPromises.push(deferred);
67+
promise = deferred.promise;
68+
}
69+
70+
return promise.then(actualCallback => {
71+
expect(actualCallback).to.equal(expectedCallback);
72+
});
73+
}
74+
75+
/**
76+
* Verifies that we did not encounter any unexpected callbacks.
77+
*/
78+
verifyNoPendingCallbacks(): void {
79+
expect(this.pendingCallbacks).to.be.empty;
80+
}
81+
82+
onHandshakeComplete(): Promise<void> {
83+
return this.resolvePending('handshakeComplete');
84+
}
85+
86+
onMutationResult(
87+
commitVersion: SnapshotVersion,
88+
results: MutationResult[]
89+
): Promise<void> {
90+
return this.resolvePending('mutationResult');
91+
}
92+
93+
onWatchChange(
94+
watchChange:
95+
| DocumentWatchChange
96+
| WatchTargetChange
97+
| ExistenceFilterChange,
98+
snapshot: SnapshotVersion
99+
): Promise<void> {
100+
return this.resolvePending('watchChange');
101+
}
102+
103+
onOpen(): Promise<void> {
104+
return this.resolvePending('open');
105+
}
106+
107+
onClose(err?: firestore.FirestoreError): Promise<void> {
108+
return this.resolvePending('close');
109+
}
110+
111+
private resolvePending(actualCallback: StreamEventType): Promise<void> {
112+
if (this.pendingPromises.length > 0) {
113+
let pendingPromise = this.pendingPromises.shift();
114+
pendingPromise.resolve(actualCallback);
115+
} else {
116+
this.pendingCallbacks.push(actualCallback);
117+
}
118+
return Promise.resolve();
119+
}
120+
}
121+
122+
describe('Watch Stream', () => {
123+
let streamListener: StreamStatusListener;
124+
125+
beforeEach(() => {
126+
streamListener = new StreamStatusListener();
127+
});
128+
129+
afterEach(() => {
130+
streamListener.verifyNoPendingCallbacks();
131+
});
132+
133+
/**
134+
* Verifies that the watch stream does not issue an onClose callback after a
135+
* call to stop().
136+
*/
137+
asyncIt('can be stopped before handshake', () => {
138+
let watchStream: PersistentListenStream;
139+
140+
return withTestDatastore(ds => {
141+
watchStream = ds.newPersistentWatchStream(streamListener);
142+
watchStream.start();
143+
144+
return streamListener.awaitCallback('open').then(() => {
145+
// Stop must not call onClose because the full implementation of the callback could
146+
// attempt to restart the stream in the event it had pending watches.
147+
watchStream.stop();
148+
});
149+
});
150+
});
151+
});
152+
153+
describe('Write Stream', () => {
154+
let queue: AsyncQueue;
155+
let streamListener: StreamStatusListener;
156+
157+
beforeEach(() => {
158+
queue = new AsyncQueue();
159+
streamListener = new StreamStatusListener();
160+
});
161+
162+
afterEach(() => {
163+
streamListener.verifyNoPendingCallbacks();
164+
});
165+
166+
/**
167+
* Verifies that the write stream does not issue an onClose callback after a
168+
* call to stop().
169+
*/
170+
asyncIt('can be stopped before handshake', () => {
171+
let writeStream: PersistentWriteStream;
172+
173+
return withTestDatastore(ds => {
174+
writeStream = ds.newPersistentWriteStream(streamListener);
175+
writeStream.start();
176+
return streamListener.awaitCallback('open');
177+
}).then(() => {
178+
// Don't start the handshake.
179+
180+
// Stop must not call onClose because the full implementation of the callback could
181+
// attempt to restart the stream in the event it had pending writes.
182+
writeStream.stop();
183+
});
184+
});
185+
186+
asyncIt('can be stopped after handshake', () => {
187+
const mutations = [setMutation('docs/1', { foo: 'bar' })];
188+
189+
let writeStream: PersistentWriteStream;
190+
191+
return withTestDatastore(ds => {
192+
writeStream = ds.newPersistentWriteStream(streamListener);
193+
writeStream.start();
194+
return streamListener.awaitCallback('open');
195+
})
196+
.then(() => {
197+
// Writing before the handshake should throw
198+
expect(() => writeStream.writeMutations(mutations)).to.throw(
199+
'Handshake must be complete before writing mutations'
200+
);
201+
writeStream.writeHandshake();
202+
return streamListener.awaitCallback('handshakeComplete');
203+
})
204+
.then(() => {
205+
// Now writes should succeed
206+
writeStream.writeMutations(mutations);
207+
return streamListener.awaitCallback('mutationResult');
208+
})
209+
.then(() => {
210+
writeStream.stop();
211+
});
212+
});
213+
});

packages/firestore/test/integration/util/helpers.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717
import * as firestore from 'firestore';
1818

1919
import { DatabaseId, DatabaseInfo } from '../../../src/core/database_info';
20+
import { Datastore } from '../../../src/remote/datastore';
2021

2122
import firebase from './firebase_export';
23+
import { EmptyCredentialsProvider } from '../../../src/api/credentials';
24+
import { PlatformSupport } from '../../../src/platform/platform';
25+
import { AsyncQueue } from '../../../src/util/async_queue';
2226

2327
// tslint:disable-next-line:no-any __karma__ is an untyped global
2428
declare const __karma__: any;
@@ -82,6 +86,29 @@ export function getDefaultDatabaseInfo(): DatabaseInfo {
8286
);
8387
}
8488

89+
export function withTestDatastore(
90+
fn: (datastore: Datastore, queue: AsyncQueue) => Promise<void>
91+
): Promise<void> {
92+
const databaseInfo = getDefaultDatabaseInfo();
93+
const queue = new AsyncQueue();
94+
return PlatformSupport.getPlatform()
95+
.loadConnection(databaseInfo)
96+
.then(conn => {
97+
const serializer = PlatformSupport.getPlatform().newSerializer(
98+
databaseInfo.databaseId
99+
);
100+
const datastore = new Datastore(
101+
databaseInfo,
102+
queue,
103+
conn,
104+
new EmptyCredentialsProvider(),
105+
serializer
106+
);
107+
108+
return fn(datastore, queue);
109+
});
110+
}
111+
85112
export function withTestDb(
86113
persistence: boolean,
87114
fn: (db: firestore.Firestore) => Promise<void>

0 commit comments

Comments
 (0)