Skip to content

Commit 7b6fa7a

Browse files
authored
Merge a869466 into f347dea
2 parents f347dea + a869466 commit 7b6fa7a

File tree

3 files changed

+586
-1
lines changed

3 files changed

+586
-1
lines changed

packages/firestore/src/protos/firestore_proto_api.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ export declare namespace firestoreV1ApiClientInterfaces {
184184
interface Document {
185185
name?: string;
186186
fields?: ApiClientObjectMap<Value>;
187-
createTime?: string;
187+
createTime?: Timestamp;
188188
updateTime?: Timestamp;
189189
}
190190
interface DocumentChange {
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/**
2+
* @license
3+
* Copyright 2020 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import {
19+
BundleElement,
20+
BundleMetadata
21+
} from '../protos/firestore_bundle_proto';
22+
import { Deferred } from './promise';
23+
24+
/**
25+
* A complete element in the bundle stream, together with the byte length it
26+
* occupies in the stream.
27+
*/
28+
export class SizedBundleElement {
29+
constructor(
30+
public readonly payload: BundleElement,
31+
// How many bytes this element takes to store in the bundle.
32+
public readonly byteLength: number
33+
) {}
34+
35+
isBundleMetadata(): boolean {
36+
return 'metadata' in this.payload;
37+
}
38+
}
39+
40+
/**
41+
* Create a `ReadableStream` from a underlying buffer.
42+
*
43+
* @param data: Underlying buffer.
44+
* @param bytesPerRead: How many bytes to read from the underlying buffer from
45+
* each read through the stream.
46+
*/
47+
export function toReadableStream(
48+
data: Uint8Array | ArrayBuffer,
49+
bytesPerRead = 10240
50+
): ReadableStream<Uint8Array | ArrayBuffer> {
51+
let readFrom = 0;
52+
return new ReadableStream({
53+
start(controller) {},
54+
async pull(controller): Promise<void> {
55+
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
56+
readFrom += bytesPerRead;
57+
if (readFrom >= data.byteLength) {
58+
controller.close();
59+
}
60+
}
61+
});
62+
}
63+
64+
/**
65+
* A class representing a bundle.
66+
*
67+
* Takes a bundle stream or buffer, and presents abstractions to read bundled
68+
* elements out of the underlying content.
69+
*/
70+
export class BundleReader {
71+
/** Cached bundle metadata. */
72+
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>();
73+
/** The reader instance of the given ReadableStream. */
74+
private reader: ReadableStreamDefaultReader;
75+
/**
76+
* Internal buffer to hold bundle content, accumulating incomplete element
77+
* content.
78+
*/
79+
private buffer: Uint8Array = new Uint8Array();
80+
/** The decoder used to parse binary data into strings. */
81+
private textDecoder = new TextDecoder('utf-8');
82+
83+
constructor(
84+
private bundleStream:
85+
| ReadableStream<Uint8Array | ArrayBuffer>
86+
| Uint8Array
87+
| ArrayBuffer
88+
) {
89+
if (
90+
bundleStream instanceof Uint8Array ||
91+
bundleStream instanceof ArrayBuffer
92+
) {
93+
this.bundleStream = toReadableStream(bundleStream);
94+
}
95+
this.reader = (this.bundleStream as ReadableStream).getReader();
96+
97+
// Read the metadata (which is the first element).
98+
this.nextElementImpl().then(
99+
element => {
100+
if (element && element.isBundleMetadata()) {
101+
this.metadata.resolve(element.payload.metadata!);
102+
} else {
103+
this.metadata.reject(
104+
new Error(`The first element of the bundle is not a metadata, it is
105+
${JSON.stringify(element?.payload)}`)
106+
);
107+
}
108+
},
109+
error => this.metadata.reject(error)
110+
);
111+
}
112+
113+
/**
114+
* Returns the metadata of the bundle.
115+
*/
116+
async getMetadata(): Promise<BundleMetadata> {
117+
return this.metadata.promise;
118+
}
119+
120+
/**
121+
* Returns the next BundleElement (together with its byte size in the bundle)
122+
* that has not been read from underlying ReadableStream. Returns null if we
123+
* have reached the end of the stream.
124+
*/
125+
async nextElement(): Promise<SizedBundleElement | null> {
126+
// Makes sure metadata is read before proceeding.
127+
await this.getMetadata();
128+
return this.nextElementImpl();
129+
}
130+
131+
/**
132+
* Reads from the head of internal buffer, and pulling more data from
133+
* underlying stream if a complete element cannot be found, until an
134+
* element(including the prefixed length and the JSON string) is found.
135+
*
136+
* Once a complete element is read, it is dropped from internal buffer.
137+
*
138+
* Returns either the bundled element, or null if we have reached the end of
139+
* the stream.
140+
*/
141+
private async nextElementImpl(): Promise<SizedBundleElement | null> {
142+
const lengthBuffer = await this.readLength();
143+
if (lengthBuffer === null) {
144+
return null;
145+
}
146+
147+
const lengthString = this.textDecoder.decode(lengthBuffer);
148+
const length = Number(lengthString);
149+
if (isNaN(length)) {
150+
this.raiseError(`length string (${lengthString}) is not valid number`);
151+
}
152+
153+
const jsonString = await this.readJsonString(length);
154+
155+
return new SizedBundleElement(
156+
JSON.parse(jsonString),
157+
lengthBuffer.length + length
158+
);
159+
}
160+
161+
/** First index of '{' from the underlying buffer. */
162+
private indexOfOpenBracket(): number {
163+
return this.buffer.findIndex(v => v === '{'.charCodeAt(0));
164+
}
165+
166+
/**
167+
* Reads from the beginning of the internal buffer, until the first '{', and
168+
* return the content.
169+
*
170+
* If reached end of the stream, returns a null.
171+
*/
172+
private async readLength(): Promise<Uint8Array | null> {
173+
while (this.indexOfOpenBracket() < 0) {
174+
const done = await this.pullMoreDataToBuffer();
175+
if (done) {
176+
break;
177+
}
178+
}
179+
180+
// Broke out of the loop because underlying stream is closed, and there
181+
// happens to be no more data to process.
182+
if (this.buffer.length === 0) {
183+
return null;
184+
}
185+
186+
const position = this.indexOfOpenBracket();
187+
// Broke out of the loop because underlying stream is closed, but still
188+
// cannot find an open bracket.
189+
if (position < 0) {
190+
this.raiseError(
191+
'Reached the end of bundle when a length string is expected.'
192+
);
193+
}
194+
195+
const result = this.buffer.slice(0, position);
196+
// Update the internal buffer to drop the read length.
197+
this.buffer = this.buffer.slice(position);
198+
return result;
199+
}
200+
201+
/**
202+
* Reads from a specified position from the internal buffer, for a specified
203+
* number of bytes, pulling more data from the underlying stream if needed.
204+
*
205+
* Returns a string decoded from the read bytes.
206+
*/
207+
private async readJsonString(length: number): Promise<string> {
208+
while (this.buffer.length < length) {
209+
const done = await this.pullMoreDataToBuffer();
210+
if (done) {
211+
this.raiseError('Reached the end of bundle when more is expected.');
212+
}
213+
}
214+
215+
const result = this.textDecoder.decode(this.buffer.slice(0, length));
216+
// Update the internal buffer to drop the read json string.
217+
this.buffer = this.buffer.slice(length);
218+
return result;
219+
}
220+
221+
private raiseError(message: string): void {
222+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
223+
this.reader.cancel('Invalid bundle format.');
224+
throw new Error(message);
225+
}
226+
227+
/**
228+
* Pulls more data from underlying stream to internal buffer.
229+
* Returns a boolean indicating whether the stream is finished.
230+
*/
231+
private async pullMoreDataToBuffer(): Promise<boolean> {
232+
const result = await this.reader.read();
233+
if (!result.done) {
234+
const newBuffer = new Uint8Array(
235+
this.buffer.length + result.value.length
236+
);
237+
newBuffer.set(this.buffer);
238+
newBuffer.set(result.value, this.buffer.length);
239+
this.buffer = newBuffer;
240+
}
241+
return result.done;
242+
}
243+
}

0 commit comments

Comments
 (0)