Skip to content

Untangle Datastore #2971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 24, 2020
9 changes: 2 additions & 7 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { Document, MaybeDocument, NoDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
import { Platform } from '../platform/platform';
import { Datastore } from '../remote/datastore';
import { newDatastore } from '../remote/datastore';
import { RemoteStore } from '../remote/remote_store';
import { AsyncQueue } from '../util/async_queue';
import { Code, FirestoreError } from '../util/error';
Expand Down Expand Up @@ -238,12 +238,7 @@ export class FirestoreClient {
const serializer = this.platform.newSerializer(
this.databaseInfo.databaseId
);
const datastore = new Datastore(
this.asyncQueue,
connection,
this.credentials,
serializer
);
const datastore = newDatastore(connection, this.credentials, serializer);

await componentProvider.initialize({
asyncQueue: this.asyncQueue,
Expand Down
12 changes: 8 additions & 4 deletions packages/firestore/src/core/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ import {
Precondition,
VerifyMutation
} from '../model/mutation';
import { Datastore } from '../remote/datastore';
import { debugAssert, fail } from '../util/assert';
import {
Datastore,
invokeBatchGetDocumentsRpc,
invokeCommitRpc
} from '../remote/datastore';
import { fail, debugAssert } from '../util/assert';
import { Code, FirestoreError } from '../util/error';
import { SnapshotVersion } from './snapshot_version';

Expand Down Expand Up @@ -66,7 +70,7 @@ export class Transaction {
'Firestore transactions require all reads to be executed before all writes.'
);
}
const docs = await this.datastore.lookup(keys);
const docs = await invokeBatchGetDocumentsRpc(this.datastore, keys);
docs.forEach(doc => {
if (doc instanceof NoDocument || doc instanceof Document) {
this.recordVersion(doc);
Expand Down Expand Up @@ -112,7 +116,7 @@ export class Transaction {
unwritten.forEach((key, _version) => {
this.mutations.push(new VerifyMutation(key, this.precondition(key)));
});
await this.datastore.commit(this.mutations);
await invokeCommitRpc(this.datastore, this.mutations);
this.committed = true;
}

Expand Down
2 changes: 2 additions & 0 deletions packages/firestore/src/protos/firestore_proto_api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export declare namespace firestoreV1ApiClientInterfaces {
values?: Value[];
}
interface BatchGetDocumentsRequest {
database?: string;
documents?: string[];
mask?: DocumentMask;
transaction?: string;
Expand All @@ -164,6 +165,7 @@ export declare namespace firestoreV1ApiClientInterfaces {
allDescendants?: boolean;
}
interface CommitRequest {
database?: string;
writes?: Write[];
transaction?: string;
}
Expand Down
198 changes: 107 additions & 91 deletions packages/firestore/src/remote/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,113 +16,44 @@
*/

import { CredentialsProvider } from '../api/credentials';
import { maybeDocumentMap } from '../model/collections';
import { MaybeDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation, MutationResult } from '../model/mutation';
import * as api from '../protos/firestore_proto_api';
import { hardAssert } from '../util/assert';
import { AsyncQueue } from '../util/async_queue';
import { debugCast, hardAssert } from '../util/assert';
import { Code, FirestoreError } from '../util/error';
import { Connection } from './connection';
import { JsonProtoSerializer } from './serializer';
import {
WatchStreamListener,
WriteStreamListener,
PersistentListenStream,
PersistentWriteStream
PersistentWriteStream,
WatchStreamListener,
WriteStreamListener
} from './persistent_stream';
import { AsyncQueue } from '../util/async_queue';

import { JsonProtoSerializer } from './serializer';

// The generated proto interfaces for these class are missing the database
// field. So we add it here.
// TODO(b/36015800): Remove this once the api generator is fixed.
interface BatchGetDocumentsRequest extends api.BatchGetDocumentsRequest {
database?: string;
}
interface CommitRequest extends api.CommitRequest {
database?: string;
}
/**
* Datastore and its related methods are a wrapper around the external Google
* Cloud Datastore grpc API, which provides an interface that is more convenient
* for the rest of the client SDK architecture to consume.
*/
export class Datastore {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you made this an interface this wouldn't even materialize in the generated code.

The downside is that empty interfaces match any object so we'd effectively lose type safety. Maybe that's why you chose class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, an empty interface matches all objects in TypeScript. It is even commonly used to do exactly that.


/**
* Datastore is a wrapper around the external Google Cloud Datastore grpc API,
* which provides an interface that is more convenient for the rest of the
* client SDK architecture to consume.
* An implementation of Datastore that exposes additional state for internal
* consumption.
*/
export class Datastore {
class DatastoreImpl extends Datastore {
constructor(
private queue: AsyncQueue,
private connection: Connection,
private credentials: CredentialsProvider,
private serializer: JsonProtoSerializer
) {}

newPersistentWriteStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than changing the object models and completely rearranging the client (so that it doesn't match the other ports), a better strategy would be to adopt tree-shakeable APIs internally.

That is, make classes like Datastore more like just dumb structs, and make the methods that used to be in the class plain functions that take Datastore as an argument.

This will greatly reduce the amount of churn in the relative arrangement of components. Indeed, you could apply this to commit and other methods on datastore to ensure that all of it tree-shakes out if unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the downsides is that it makes every member public, and I kind of wanted to avoid this. Your argument that we should pull out all methods does win here though. I updated the PR, which also greatly reduces the diff (a large part are now just test cleanup).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I meant by dumb structs. Tree-shakeable APIs make this code look like old-school C.

Is it possible to adopt a convention of which members are actually public vs those which would be private, but are public by virtue of this kind of code structure?

In C programs a way to handle this was to forward declare a private member of your struct to hide details:

typedef struct DatastoreDetails DatastoreDetails;

typedef struct Datastore {
  DatastoreDetails* details;
} Datastore;

Then you only define struct DatastoreDetails in your .c file.

A similar technique could be translated into typescript:

  • The exported type contains a private details: unknown
  • DatastoreDetails is not exported
  • The constructor assigns an instance of DatastoreDetails to the details field.
  • There's a non-exported details(datastore: Datastore) => DatastoreDetails that casts

I'm fairly certain this isn't actually worth it, but this is how we implemented access control before it existed in the language.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit with a TypeScript version of this hack: 0295cd5

It should be pretty easy to understand, but I wanted to point out a downside: With this change, at least IntelliJ now consider all members of DatastoreImpl unused, which would breaks its automated refactor should we use it on classes like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a lot better than my suggestion. If we can get IntelliJ happy, I think this is the way to go.

listener: WriteStreamListener
): PersistentWriteStream {
return new PersistentWriteStream(
this.queue,
this.connection,
this.credentials,
this.serializer,
listener
);
}

newPersistentWatchStream(
listener: WatchStreamListener
): PersistentListenStream {
return new PersistentListenStream(
this.queue,
this.connection,
this.credentials,
this.serializer,
listener
);
}

commit(mutations: Mutation[]): Promise<MutationResult[]> {
const params: CommitRequest = {
database: this.serializer.encodedDatabaseId,
writes: mutations.map(m => this.serializer.toMutation(m))
};
return this.invokeRPC<CommitRequest, api.CommitResponse>(
'Commit',
params
).then(response => {
return this.serializer.fromWriteResults(
response.writeResults,
response.commitTime
);
});
}

lookup(keys: DocumentKey[]): Promise<MaybeDocument[]> {
const params: BatchGetDocumentsRequest = {
database: this.serializer.encodedDatabaseId,
documents: keys.map(k => this.serializer.toName(k))
};
return this.invokeStreamingRPC<
BatchGetDocumentsRequest,
api.BatchGetDocumentsResponse
>('BatchGetDocuments', params).then(response => {
let docs = maybeDocumentMap();
response.forEach(proto => {
const doc = this.serializer.fromMaybeDocument(proto);
docs = docs.insert(doc.key, doc);
});
const result: MaybeDocument[] = [];
keys.forEach(key => {
const doc = docs.get(key);
hardAssert(!!doc, 'Missing entity in write response for ' + key);
result.push(doc);
});
return result;
});
public readonly connection: Connection,
public readonly credentials: CredentialsProvider,
public readonly serializer: JsonProtoSerializer
) {
super();
}

/** Gets an auth token and invokes the provided RPC. */
private invokeRPC<Req, Resp>(rpcName: string, request: Req): Promise<Resp> {
invokeRPC<Req, Resp>(rpcName: string, request: Req): Promise<Resp> {
return this.credentials
.getToken()
.then(token => {
Expand All @@ -137,7 +68,7 @@ export class Datastore {
}

/** Gets an auth token and invokes the provided RPC with streamed results. */
private invokeStreamingRPC<Req, Resp>(
invokeStreamingRPC<Req, Resp>(
rpcName: string,
request: Req
): Promise<Resp[]> {
Expand All @@ -158,3 +89,88 @@ export class Datastore {
});
}
}

export function newDatastore(
connection: Connection,
credentials: CredentialsProvider,
serializer: JsonProtoSerializer
): Datastore {
return new DatastoreImpl(connection, credentials, serializer);
}

export async function invokeCommitRpc(
datastore: Datastore,
mutations: Mutation[]
): Promise<MutationResult[]> {
const datastoreImpl = debugCast(datastore, DatastoreImpl);
const params = {
database: datastoreImpl.serializer.encodedDatabaseId,
writes: mutations.map(m => datastoreImpl.serializer.toMutation(m))
};
const response = await datastoreImpl.invokeRPC<
api.CommitRequest,
api.CommitResponse
>('Commit', params);
return datastoreImpl.serializer.fromWriteResults(
response.writeResults,
response.commitTime
);
}

export async function invokeBatchGetDocumentsRpc(
datastore: Datastore,
keys: DocumentKey[]
): Promise<MaybeDocument[]> {
const datastoreImpl = debugCast(datastore, DatastoreImpl);
const params = {
database: datastoreImpl.serializer.encodedDatabaseId,
documents: keys.map(k => datastoreImpl.serializer.toName(k))
};
const response = await datastoreImpl.invokeStreamingRPC<
api.BatchGetDocumentsRequest,
api.BatchGetDocumentsResponse
>('BatchGetDocuments', params);

const docs = new Map<string, MaybeDocument>();
response.forEach(proto => {
const doc = datastoreImpl.serializer.fromMaybeDocument(proto);
docs.set(doc.key.toString(), doc);
});
const result: MaybeDocument[] = [];
keys.forEach(key => {
const doc = docs.get(key.toString());
hardAssert(!!doc, 'Missing entity in write response for ' + key);
result.push(doc);
});
return result;
}

export function newPersistentWriteStream(
datastore: Datastore,
queue: AsyncQueue,
listener: WriteStreamListener
): PersistentWriteStream {
const datastoreImpl = debugCast(datastore, DatastoreImpl);
return new PersistentWriteStream(
queue,
datastoreImpl.connection,
datastoreImpl.credentials,
datastoreImpl.serializer,
listener
);
}

export function newPersistentWatchStream(
datastore: Datastore,
queue: AsyncQueue,
listener: WatchStreamListener
): PersistentListenStream {
const datastoreImpl = debugCast(datastore, DatastoreImpl);
return new PersistentListenStream(
queue,
datastoreImpl.connection,
datastoreImpl.credentials,
datastoreImpl.serializer,
listener
);
}
10 changes: 7 additions & 3 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ import { logDebug } from '../util/log';
import { DocumentKeySet } from '../model/collections';
import { AsyncQueue } from '../util/async_queue';
import { ConnectivityMonitor, NetworkStatus } from './connectivity_monitor';
import { Datastore } from './datastore';
import {
Datastore,
newPersistentWatchStream,
newPersistentWriteStream
} from './datastore';
import { OnlineStateTracker } from './online_state_tracker';
import {
PersistentListenStream,
Expand Down Expand Up @@ -151,13 +155,13 @@ export class RemoteStore implements TargetMetadataProvider {
);

// Create streams (but note they're not started yet).
this.watchStream = this.datastore.newPersistentWatchStream({
this.watchStream = newPersistentWatchStream(this.datastore, asyncQueue, {
onOpen: this.onWatchStreamOpen.bind(this),
onClose: this.onWatchStreamClose.bind(this),
onWatchChange: this.onWatchStreamChange.bind(this)
});

this.writeStream = this.datastore.newPersistentWriteStream({
this.writeStream = newPersistentWriteStream(this.datastore, asyncQueue, {
onOpen: this.onWriteStreamOpen.bind(this),
onClose: this.onWriteStreamClose.bind(this),
onHandshakeComplete: this.onWriteHandshakeComplete.bind(this),
Expand Down
16 changes: 16 additions & 0 deletions packages/firestore/src/util/assert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,19 @@ export function debugAssert(
fail(message);
}
}

/**
* Casts `obj` to `T`. In non-production builds, verifies that `obj` is an
* instance of `T` before casting.
*/
export function debugCast<T>(
obj: object,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
constructor: { new (...args: any[]): T }
): T {
debugAssert(
obj instanceof constructor,
`Expected type '${constructor.name}', but was '${obj.constructor.name}'`
);
return obj as T;
}
Loading