Skip to content

Commit b649483

Browse files
committed
stream.test.ts: fix tests to accommodate the new "onConnected" event.
1 parent 0edcf63 commit b649483

File tree

1 file changed

+35
-3
lines changed

1 file changed

+35
-3
lines changed

packages/firestore/test/integration/remote/stream.test.ts

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ import {
2222
Token
2323
} from '../../../src/api/credentials';
2424
import { SnapshotVersion } from '../../../src/core/snapshot_version';
25+
import { Target } from '../../../src/core/target';
26+
import { TargetData, TargetPurpose } from '../../../src/local/target_data';
2527
import { MutationResult } from '../../../src/model/mutation';
28+
import { ResourcePath } from '../../../src/model/path';
2629
import {
2730
newPersistentWatchStream,
2831
newPersistentWriteStream
@@ -57,7 +60,8 @@ type StreamEventType =
5760
| 'mutationResult'
5861
| 'watchChange'
5962
| 'open'
60-
| 'close';
63+
| 'close'
64+
| 'connected';
6165

6266
const SINGLE_MUTATION = [setMutation('docs/1', { foo: 'bar' })];
6367

@@ -117,6 +121,10 @@ class StreamStatusListener implements WatchStreamListener, WriteStreamListener {
117121
return this.resolvePending('watchChange');
118122
}
119123

124+
onConnected(): Promise<void> {
125+
return this.resolvePending('connected');
126+
}
127+
120128
onOpen(): Promise<void> {
121129
return this.resolvePending('open');
122130
}
@@ -148,6 +156,14 @@ describe('Watch Stream', () => {
148156
});
149157
});
150158
});
159+
160+
it('gets connected event before first message', () => {
161+
return withTestWatchStream(async (watchStream, streamListener) => {
162+
await streamListener.awaitCallback('open');
163+
watchStream.watch(sampleTargetData());
164+
await streamListener.awaitCallback('connected');
165+
});
166+
});
151167
});
152168

153169
class MockAuthCredentialsProvider extends EmptyAuthCredentialsProvider {
@@ -190,6 +206,7 @@ describe('Write Stream', () => {
190206
'Handshake must be complete before writing mutations'
191207
);
192208
writeStream.writeHandshake();
209+
await streamListener.awaitCallback('connected');
193210
await streamListener.awaitCallback('handshakeComplete');
194211

195212
// Now writes should succeed
@@ -205,9 +222,10 @@ describe('Write Stream', () => {
205222
return withTestWriteStream((writeStream, streamListener, queue) => {
206223
return streamListener
207224
.awaitCallback('open')
208-
.then(() => {
225+
.then(async () => {
209226
writeStream.writeHandshake();
210-
return streamListener.awaitCallback('handshakeComplete');
227+
await streamListener.awaitCallback('connected');
228+
await streamListener.awaitCallback('handshakeComplete');
211229
})
212230
.then(() => {
213231
writeStream.markIdle();
@@ -228,6 +246,7 @@ describe('Write Stream', () => {
228246
return withTestWriteStream(async (writeStream, streamListener, queue) => {
229247
await streamListener.awaitCallback('open');
230248
writeStream.writeHandshake();
249+
await streamListener.awaitCallback('connected');
231250
await streamListener.awaitCallback('handshakeComplete');
232251

233252
// Mark the stream idle, but immediately cancel the idle timer by issuing another write.
@@ -336,3 +355,16 @@ export async function withTestWatchStream(
336355
streamListener.verifyNoPendingCallbacks();
337356
});
338357
}
358+
359+
function sampleTargetData(): TargetData {
360+
const target: Target = {
361+
path: ResourcePath.emptyPath(),
362+
collectionGroup: null,
363+
orderBy: [],
364+
filters: [],
365+
limit: null,
366+
startAt: null,
367+
endAt: null
368+
};
369+
return new TargetData(target, 1, TargetPurpose.Listen, 1);
370+
}

0 commit comments

Comments
 (0)