Skip to content

Creating Stream Tests for the Web Client #260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 4 additions & 28 deletions packages/firestore/test/integration/remote/remote.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,22 @@
*/

import { expect } from 'chai';
import { EmptyCredentialsProvider } from '../../../src/api/credentials';
import { DatabaseId, DatabaseInfo } from '../../../src/core/database_info';
import { SnapshotVersion } from '../../../src/core/snapshot_version';
import {
Document,
MaybeDocument,
NoDocument
} from '../../../src/model/document';
import { MutationResult } from '../../../src/model/mutation';
import { PlatformSupport } from '../../../src/platform/platform';
import { Datastore } from '../../../src/remote/datastore';
import { AsyncQueue } from '../../../src/util/async_queue';
import { addEqualityMatcher } from '../../util/equality_matcher';
import { asyncIt, key, setMutation } from '../../util/helpers';
import { DEFAULT_PROJECT_ID, getDefaultDatabaseInfo } from '../util/helpers';
import { withTestDatastore } from '../util/helpers';

describe('Remote Storage', () => {
addEqualityMatcher();

function initializeDatastore(): Promise<Datastore> {
const databaseInfo = getDefaultDatabaseInfo();
const queue = new AsyncQueue();
return PlatformSupport.getPlatform()
.loadConnection(databaseInfo)
.then(conn => {
const serializer = PlatformSupport.getPlatform().newSerializer(
databaseInfo.databaseId
);
return new Datastore(
databaseInfo,
queue,
conn,
new EmptyCredentialsProvider(),
serializer
);
});
}

asyncIt('can write', () => {
return initializeDatastore().then(ds => {
return withTestDatastore(ds => {
const mutation = setMutation('docs/1', { sort: 1 });

return ds.commit([mutation]).then((result: MutationResult[]) => {
Expand All @@ -67,7 +43,7 @@ describe('Remote Storage', () => {
});

asyncIt('can read', () => {
return initializeDatastore().then(ds => {
return withTestDatastore(ds => {
const k = key('docs/1');
const mutation = setMutation('docs/1', { sort: 10 });

Expand All @@ -93,7 +69,7 @@ describe('Remote Storage', () => {
});

asyncIt('can read deleted documents', () => {
return initializeDatastore().then(ds => {
return withTestDatastore(ds => {
const k = key('docs/2');
return ds.lookup([k]).then((docs: MaybeDocument[]) => {
expect(docs.length).to.equal(1);
Expand Down
213 changes: 213 additions & 0 deletions packages/firestore/test/integration/remote/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/**
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { expect } from 'chai';
import { SnapshotVersion } from '../../../src/core/snapshot_version';
import { MutationResult } from '../../../src/model/mutation';
import {
PersistentListenStream,
PersistentWriteStream,
WatchStreamListener,
WriteStreamListener
} from '../../../src/remote/persistent_stream';
import {
DocumentWatchChange,
ExistenceFilterChange,
WatchTargetChange
} from '../../../src/remote/watch_change';
import { AsyncQueue } from '../../../src/util/async_queue';
import { Deferred } from '../../../src/util/promise';
import { asyncIt, setMutation } from '../../util/helpers';
import { withTestDatastore } from '../util/helpers';

/**
* StreamEventType combines the events that can be observed by the
* WatchStreamListener and WriteStreamListener.
*/
type StreamEventType =
| 'handshakeComplete'
| 'mutationResult'
| 'watchChange'
| 'open'
| 'close';

class StreamStatusListener implements WatchStreamListener, WriteStreamListener {
private pendingCallbacks: StreamEventType[] = [];
private pendingPromises: Deferred<StreamEventType>[] = [];

/**
* Returns a Promise that resolves when the next callback fires. Resolves the
* returned Promise immediately if there is already an unprocessed callback.
*
* This method asserts that the observed callback type matches
* `expectedCallback`.
*/
awaitCallback(expectedCallback: StreamEventType): Promise<void> {
let promise: Promise<StreamEventType>;

if (this.pendingCallbacks.length > 0) {
let pendingCallback = this.pendingCallbacks.shift();
promise = Promise.resolve(pendingCallback);
} else {
const deferred = new Deferred<StreamEventType>();
this.pendingPromises.push(deferred);
promise = deferred.promise;
}

return promise.then(actualCallback => {
expect(actualCallback).to.equal(expectedCallback);
});
}

/**
* Verifies that we did not encounter any unexpected callbacks.
*/
verifyNoPendingCallbacks(): void {
expect(this.pendingCallbacks).to.be.empty;
}

onHandshakeComplete(): Promise<void> {
return this.resolvePending('handshakeComplete');
}

onMutationResult(
commitVersion: SnapshotVersion,
results: MutationResult[]
): Promise<void> {
return this.resolvePending('mutationResult');
}

onWatchChange(
watchChange:
| DocumentWatchChange
| WatchTargetChange
| ExistenceFilterChange,
snapshot: SnapshotVersion
): Promise<void> {
return this.resolvePending('watchChange');
}

onOpen(): Promise<void> {
return this.resolvePending('open');
}

onClose(err?: firestore.FirestoreError): Promise<void> {
return this.resolvePending('close');
}

private resolvePending(actualCallback: StreamEventType): Promise<void> {
if (this.pendingPromises.length > 0) {
let pendingPromise = this.pendingPromises.shift();
pendingPromise.resolve(actualCallback);
} else {
this.pendingCallbacks.push(actualCallback);
}
return Promise.resolve();
}
}

describe('Watch Stream', () => {
let streamListener: StreamStatusListener;

beforeEach(() => {
streamListener = new StreamStatusListener();
});

afterEach(() => {
streamListener.verifyNoPendingCallbacks();
});

/**
* Verifies that the watch stream does not issue an onClose callback after a
* call to stop().
*/
asyncIt('can be stopped before handshake', () => {
let watchStream: PersistentListenStream;

return withTestDatastore(ds => {
watchStream = ds.newPersistentWatchStream(streamListener);
watchStream.start();

return streamListener.awaitCallback('open').then(() => {
// Stop must not call onClose because the full implementation of the callback could
// attempt to restart the stream in the event it had pending watches.
watchStream.stop();
});
});
});
});

describe('Write Stream', () => {
let queue: AsyncQueue;
let streamListener: StreamStatusListener;

beforeEach(() => {
queue = new AsyncQueue();
streamListener = new StreamStatusListener();
});

afterEach(() => {
streamListener.verifyNoPendingCallbacks();
});

/**
* Verifies that the write stream does not issue an onClose callback after a
* call to stop().
*/
asyncIt('can be stopped before handshake', () => {
let writeStream: PersistentWriteStream;

return withTestDatastore(ds => {
writeStream = ds.newPersistentWriteStream(streamListener);
writeStream.start();
return streamListener.awaitCallback('open');
}).then(() => {
// Don't start the handshake.

// Stop must not call onClose because the full implementation of the callback could
// attempt to restart the stream in the event it had pending writes.
writeStream.stop();
});
});

asyncIt('can be stopped after handshake', () => {
const mutations = [setMutation('docs/1', { foo: 'bar' })];

let writeStream: PersistentWriteStream;

return withTestDatastore(ds => {
writeStream = ds.newPersistentWriteStream(streamListener);
writeStream.start();
return streamListener.awaitCallback('open');
})
.then(() => {
// Writing before the handshake should throw
expect(() => writeStream.writeMutations(mutations)).to.throw(
'Handshake must be complete before writing mutations'
);
writeStream.writeHandshake();
return streamListener.awaitCallback('handshakeComplete');
})
.then(() => {
// Now writes should succeed
writeStream.writeMutations(mutations);
return streamListener.awaitCallback('mutationResult');
})
.then(() => {
writeStream.stop();
});
});
});
27 changes: 27 additions & 0 deletions packages/firestore/test/integration/util/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
import * as firestore from 'firestore';

import { DatabaseId, DatabaseInfo } from '../../../src/core/database_info';
import { Datastore } from '../../../src/remote/datastore';

import firebase from './firebase_export';
import { EmptyCredentialsProvider } from '../../../src/api/credentials';
import { PlatformSupport } from '../../../src/platform/platform';
import { AsyncQueue } from '../../../src/util/async_queue';

// tslint:disable-next-line:no-any __karma__ is an untyped global
declare const __karma__: any;
Expand Down Expand Up @@ -82,6 +86,29 @@ export function getDefaultDatabaseInfo(): DatabaseInfo {
);
}

export function withTestDatastore(
fn: (datastore: Datastore, queue: AsyncQueue) => Promise<void>
): Promise<void> {
const databaseInfo = getDefaultDatabaseInfo();
const queue = new AsyncQueue();
return PlatformSupport.getPlatform()
.loadConnection(databaseInfo)
.then(conn => {
const serializer = PlatformSupport.getPlatform().newSerializer(
databaseInfo.databaseId
);
const datastore = new Datastore(
databaseInfo,
queue,
conn,
new EmptyCredentialsProvider(),
serializer
);

return fn(datastore, queue);
});
}

export function withTestDb(
persistence: boolean,
fn: (db: firestore.Firestore) => Promise<void>
Expand Down