Skip to content

Commit a30f63b

Browse files
Creating Stream Tests for the Web Client
1 parent fba3c06 commit a30f63b

File tree

1 file changed

+231
-0
lines changed

1 file changed

+231
-0
lines changed
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/**
2+
* Copyright 2017 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { expect } from 'chai';
18+
import { EmptyCredentialsProvider } from '../../../src/api/credentials';
19+
import { SnapshotVersion } from '../../../src/core/snapshot_version';
20+
import { MutationResult } from '../../../src/model/mutation';
21+
import { PlatformSupport } from '../../../src/platform/platform';
22+
import { Datastore } from '../../../src/remote/datastore';
23+
import {
24+
PersistentListenStream,
25+
PersistentWriteStream,
26+
WatchStreamListener,
27+
WriteStreamListener
28+
} from '../../../src/remote/persistent_stream';
29+
import {
30+
DocumentWatchChange,
31+
ExistenceFilterChange,
32+
WatchTargetChange
33+
} from '../../../src/remote/watch_change';
34+
import { AsyncQueue } from '../../../src/util/async_queue';
35+
import { Deferred } from '../../../src/util/promise';
36+
import { asyncIt } from '../../util/helpers';
37+
import { getDefaultDatabaseInfo } from '../util/helpers';
38+
import FirestoreError = firestore.FirestoreError;
39+
40+
type StreamStatusCallback =
41+
| 'onHandshakeComplete'
42+
| 'onMutationResult'
43+
| 'onWatchChange'
44+
| 'onOpen'
45+
| 'onClose';
46+
47+
class StreamStatusListener implements WatchStreamListener, WriteStreamListener {
48+
private pendingStates: StreamStatusCallback[] = [];
49+
private pendingPromises: Deferred<StreamStatusCallback>[] = [];
50+
51+
private resolvePending(actualCallback: StreamStatusCallback) {
52+
let pendingPromise = this.pendingPromises.shift();
53+
if (pendingPromise) {
54+
pendingPromise.resolve(actualCallback);
55+
} else {
56+
this.pendingStates.push(actualCallback);
57+
}
58+
return Promise.resolve();
59+
}
60+
61+
onHandshakeComplete(): Promise<void> {
62+
return this.resolvePending('onHandshakeComplete');
63+
}
64+
65+
onMutationResult(
66+
commitVersion: SnapshotVersion,
67+
results: MutationResult[]
68+
): Promise<void> {
69+
return this.resolvePending('onMutationResult');
70+
}
71+
72+
onWatchChange(
73+
watchChange:
74+
| DocumentWatchChange
75+
| WatchTargetChange
76+
| ExistenceFilterChange,
77+
snapshot: SnapshotVersion
78+
): Promise<void> {
79+
return this.resolvePending('onWatchChange');
80+
}
81+
82+
onOpen(): Promise<void> {
83+
return this.resolvePending('onOpen');
84+
}
85+
86+
onClose(err?: FirestoreError): Promise<void> {
87+
return this.resolvePending('onClose');
88+
}
89+
90+
/**
91+
* Returns a Promise that resolves when the 'expectedCallback' fires.
92+
* Resolves the returned Promise immediately if there is already an
93+
* unprocessed callback.
94+
* Fails the test if the expected callback does not match the name of the
95+
* actual callback function.
96+
*/
97+
awaitCallback(expectedCallback: StreamStatusCallback): Promise<void> {
98+
if (this.pendingStates.length > 0) {
99+
expect(this.pendingStates.shift()).to.eq(expectedCallback);
100+
return Promise.resolve();
101+
} else {
102+
const deferred = new Deferred<StreamStatusCallback>();
103+
this.pendingPromises.push(deferred);
104+
return deferred.promise.then(actualCallback => {
105+
expect(actualCallback).to.equal(expectedCallback);
106+
});
107+
}
108+
}
109+
110+
verifyNoPendingCallbacks(): void {
111+
expect(this.pendingStates).to.be.empty;
112+
}
113+
}
114+
115+
describe('Watch Stream', () => {
116+
let queue: AsyncQueue;
117+
let streamListener: StreamStatusListener;
118+
119+
beforeEach(() => {
120+
queue = new AsyncQueue();
121+
streamListener = new StreamStatusListener();
122+
});
123+
124+
afterEach(() => {
125+
streamListener.verifyNoPendingCallbacks();
126+
});
127+
128+
function initializeWatchStream(): Promise<PersistentListenStream> {
129+
const databaseInfo = getDefaultDatabaseInfo();
130+
131+
return PlatformSupport.getPlatform()
132+
.loadConnection(databaseInfo)
133+
.then(conn => {
134+
const serializer = PlatformSupport.getPlatform().newSerializer(
135+
databaseInfo.databaseId
136+
);
137+
return new Datastore(
138+
databaseInfo,
139+
queue,
140+
conn,
141+
new EmptyCredentialsProvider(),
142+
serializer
143+
);
144+
})
145+
.then(ds => {
146+
return ds.newPersistentWatchStream(streamListener);
147+
});
148+
}
149+
150+
asyncIt('can be stopped before handshake', () => {
151+
let watchStream: PersistentListenStream;
152+
153+
return initializeWatchStream()
154+
.then(ws => {
155+
watchStream = ws;
156+
watchStream.start();
157+
return streamListener.awaitCallback('onOpen');
158+
})
159+
.then(() => {
160+
watchStream.stop();
161+
});
162+
});
163+
});
164+
165+
describe('Write Stream', () => {
166+
let queue: AsyncQueue;
167+
let streamListener: StreamStatusListener;
168+
169+
beforeEach(() => {
170+
queue = new AsyncQueue();
171+
streamListener = new StreamStatusListener();
172+
});
173+
174+
afterEach(() => {
175+
streamListener.verifyNoPendingCallbacks();
176+
});
177+
178+
function initializeWriteStream(): Promise<PersistentWriteStream> {
179+
const databaseInfo = getDefaultDatabaseInfo();
180+
181+
return PlatformSupport.getPlatform()
182+
.loadConnection(databaseInfo)
183+
.then(conn => {
184+
const serializer = PlatformSupport.getPlatform().newSerializer(
185+
databaseInfo.databaseId
186+
);
187+
return new Datastore(
188+
databaseInfo,
189+
queue,
190+
conn,
191+
new EmptyCredentialsProvider(),
192+
serializer
193+
);
194+
})
195+
.then(ds => {
196+
return ds.newPersistentWriteStream(streamListener);
197+
});
198+
}
199+
200+
asyncIt('can be stopped before handshake', () => {
201+
let writeStream: PersistentWriteStream;
202+
203+
return initializeWriteStream()
204+
.then(ws => {
205+
writeStream = ws;
206+
writeStream.start();
207+
return streamListener.awaitCallback('onOpen');
208+
})
209+
.then(() => {
210+
writeStream.stop();
211+
});
212+
});
213+
214+
asyncIt('can be stopped after handshake', () => {
215+
let writeStream: PersistentWriteStream;
216+
217+
return initializeWriteStream()
218+
.then(ws => {
219+
writeStream = ws;
220+
writeStream.start();
221+
return streamListener.awaitCallback('onOpen');
222+
})
223+
.then(() => {
224+
writeStream.writeHandshake();
225+
return streamListener.awaitCallback('onHandshakeComplete');
226+
})
227+
.then(() => {
228+
writeStream.stop();
229+
});
230+
});
231+
});

0 commit comments

Comments
 (0)