Skip to content

Creating Stream Tests for the Web Client #260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 25, 2017
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions packages/firestore/test/integration/remote/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/**
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { expect } from 'chai';
import { EmptyCredentialsProvider } from '../../../src/api/credentials';
import { SnapshotVersion } from '../../../src/core/snapshot_version';
import { MutationResult } from '../../../src/model/mutation';
import { PlatformSupport } from '../../../src/platform/platform';
import { Datastore } from '../../../src/remote/datastore';
import {
PersistentListenStream,
PersistentWriteStream,
WatchStreamListener,
WriteStreamListener
} from '../../../src/remote/persistent_stream';
import {
DocumentWatchChange,
ExistenceFilterChange,
WatchTargetChange
} from '../../../src/remote/watch_change';
import { AsyncQueue } from '../../../src/util/async_queue';
import { Deferred } from '../../../src/util/promise';
import { asyncIt } from '../../util/helpers';
import { getDefaultDatabaseInfo } from '../util/helpers';
import FirestoreError = firestore.FirestoreError;

type StreamStatusCallback =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps StreamCallbackType or StreameEvent? (not all of these are "status" events, and I'd expect an actual "callback" type to be a function, not a string)

I think the slightly strange thing here is that we're including 'on' in the names, which makes them refer to the method you call to register an event callback rather than the event itself. So it's also tempting to drop the 'on's ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is it worth adding a comment calling out that this is a merging of the watch stream and write stream callback types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to StreameEventType, which indeed suggests to remove the 'on' from the type definition.

| 'onHandshakeComplete'
| 'onMutationResult'
| 'onWatchChange'
| 'onOpen'
| 'onClose';

class StreamStatusListener implements WatchStreamListener, WriteStreamListener {
private pendingStates: StreamStatusCallback[] = [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'state' seems out-of-place. pendingCallbacks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private pendingPromises: Deferred<StreamStatusCallback>[] = [];

private resolvePending(actualCallback: StreamStatusCallback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you declare the return type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

let pendingPromise = this.pendingPromises.shift();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Here you're calling shift() and checking for undefined. Below (for pendingStates) you check the length > 0 before calling shift. I prefer the latter, but regardless I'd try to be consistent.

Copy link
Contributor Author

@schmidt-sebastian schmidt-sebastian Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the length check, but left the intermediate variable since it makes the control flow easier to follow.

if (pendingPromise) {
pendingPromise.resolve(actualCallback);
} else {
this.pendingStates.push(actualCallback);
}
return Promise.resolve();
}

onHandshakeComplete(): Promise<void> {
return this.resolvePending('onHandshakeComplete');
}

onMutationResult(
commitVersion: SnapshotVersion,
results: MutationResult[]
): Promise<void> {
return this.resolvePending('onMutationResult');
}

onWatchChange(
watchChange:
| DocumentWatchChange
| WatchTargetChange
| ExistenceFilterChange,
snapshot: SnapshotVersion
): Promise<void> {
return this.resolvePending('onWatchChange');
}

onOpen(): Promise<void> {
return this.resolvePending('onOpen');
}

onClose(err?: FirestoreError): Promise<void> {
return this.resolvePending('onClose');
}

/**
* Returns a Promise that resolves when 'expectedCallback' fires.
* Resolves the returned Promise immediately if there is already an
* unprocessed callback.
* Fails the test if the expected callback does not match the name of the
* actual callback function.
*/
awaitCallback(expectedCallback: StreamStatusCallback): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd normally put the public surface area of the class (awaitCallback / verifyNoPendingCallbacks) at the top, but feel free to leave as-is for symmetry with iOS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to top.

if (this.pendingStates.length > 0) {
expect(this.pendingStates.shift()).to.equal(expectedCallback);
return Promise.resolve();
} else {
const deferred = new Deferred<StreamStatusCallback>();
this.pendingPromises.push(deferred);
return deferred.promise.then(actualCallback => {
expect(actualCallback).to.equal(expectedCallback);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is a bit wonky in that on failure it might throw an exception or might return a failed promise, depending on if we have pendingStates (and that the expectedCallback expectation is repeated). Perhaps do something like:

let promise: Promise<StreamStatusCallback>;
if (this.pendingStates.length > 0) {
  promise = Promise.resolve(this.pendingStates.shift());
} else {
  const deferred = new Deferred<StreamStatusCallback>();
  this.pendingPromises.push(deferred);
  promise = deferred.promise;
}
return promise.then(actualCallback => {
  expect(actualCallback).to.equal(expectedCallback);
});

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code to always return a Promise, and to reject it if the callback type doesn't match.

}

verifyNoPendingCallbacks(): void {
expect(this.pendingStates).to.be.empty;
}
}

describe('Watch Stream', () => {
let queue: AsyncQueue;
let streamListener: StreamStatusListener;

beforeEach(() => {
queue = new AsyncQueue();
streamListener = new StreamStatusListener();
});

afterEach(() => {
streamListener.verifyNoPendingCallbacks();
});

function initializeWatchStream(): Promise<PersistentListenStream> {
const databaseInfo = getDefaultDatabaseInfo();

return PlatformSupport.getPlatform()
.loadConnection(databaseInfo)
.then(conn => {
const serializer = PlatformSupport.getPlatform().newSerializer(
databaseInfo.databaseId
);
return new Datastore(
databaseInfo,
queue,
conn,
new EmptyCredentialsProvider(),
serializer
);
})
.then(ds => {
return ds.newPersistentWatchStream(streamListener);
});
}

asyncIt('can be stopped before handshake', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iOS test has a comment:

/** Verifies that the watch stream does not issue an onClose callback after a call to stop(). */

and does indeed trigger a GRPC close event (to make sure that our stream doesn't trigger a close event in response). Why aren't we testing that on web?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth on this just now, but decided to ultimately leave it as is. The handleStreamClose callback on the Web only ever gets called when the stream is still open and asserts that this is the case. To bring it closer to the other platforms, I would have to break this invariant.

If you prefer, I can also assert here that 'handleStreamClose' throws an error at this point. Just making it public for that doesn't seem worth it - especially since it invites other consumers to trip over this assertion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. To make web a more direct port, I think we'd refactor

this.stream.onClose((error: FirestoreError) => {
so that it delegates to a function that could then be called from this test. But going to that much trouble probably isn't warranted.

let watchStream: PersistentListenStream;

return initializeWatchStream()
.then(ws => {
watchStream = ws;
watchStream.start();
return streamListener.awaitCallback('onOpen');
})
.then(() => {
watchStream.stop();
});
});
});

describe('Write Stream', () => {
let queue: AsyncQueue;
let streamListener: StreamStatusListener;

beforeEach(() => {
queue = new AsyncQueue();
streamListener = new StreamStatusListener();
});

afterEach(() => {
streamListener.verifyNoPendingCallbacks();
});

function initializeWriteStream(): Promise<PersistentWriteStream> {
const databaseInfo = getDefaultDatabaseInfo();

return PlatformSupport.getPlatform()
.loadConnection(databaseInfo)
.then(conn => {
const serializer = PlatformSupport.getPlatform().newSerializer(
databaseInfo.databaseId
);
return new Datastore(
databaseInfo,
queue,
conn,
new EmptyCredentialsProvider(),
serializer
);
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to factor out an initializeDatastore() method to avoid 90% of this repeated code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.then(ds => {
return ds.newPersistentWriteStream(streamListener);
});
}

asyncIt('can be stopped before handshake', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above about not testing we don't get extra onClose event after stop().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

let writeStream: PersistentWriteStream;

return initializeWriteStream()
.then(ws => {
writeStream = ws;
writeStream.start();
return streamListener.awaitCallback('onOpen');
})
.then(() => {
writeStream.stop();
});
});

asyncIt('can be stopped after handshake', () => {
let writeStream: PersistentWriteStream;

return initializeWriteStream()
.then(ws => {
writeStream = ws;
writeStream.start();
return streamListener.awaitCallback('onOpen');
})
.then(() => {
writeStream.writeHandshake();
return streamListener.awaitCallback('onHandshakeComplete');
})
.then(() => {
writeStream.stop();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the web test not verify that attempting to write before handshake throws but works after? If we're going to the effort of porting the tests, we should make sure they're at parity (or at least add a comment for why they're not). :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally forgot to port this part :(

});
});