Skip to content

Commit 9d6edc5

Browse files
committed
Merge branch 'wuandy/Bundles' into wuandy/BundleReaderNode
# Conflicts: # packages/firestore/src/util/bundle_reader.ts # packages/firestore/test/unit/util/bundle.test.ts
2 parents 24e10cb + b0c8299 commit 9d6edc5

File tree

3 files changed

+287
-242
lines changed

3 files changed

+287
-242
lines changed

packages/firestore/src/protos/firestore_proto_api.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ export declare namespace firestoreV1ApiClientInterfaces {
184184
interface Document {
185185
name?: string;
186186
fields?: ApiClientObjectMap<Value>;
187-
createTime?: string;
187+
createTime?: Timestamp;
188188
updateTime?: Timestamp;
189189
}
190190
interface DocumentChange {

packages/firestore/src/util/bundle_reader.ts

Lines changed: 116 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import {
1919
BundleElement,
2020
BundleMetadata
2121
} from '../protos/firestore_bundle_proto';
22-
import { ByteStreamReader, PlatformSupport } from '../platform/platform';
22+
import { Deferred } from './promise';
2323

2424
/**
2525
* A complete element in the bundle stream, together with the byte length it
@@ -37,19 +37,47 @@ export class SizedBundleElement {
3737
}
3838
}
3939

40+
/**
41+
* Create a `ReadableStream` from a underlying buffer.
42+
*
43+
* @param data: Underlying buffer.
44+
* @param bytesPerRead: How many bytes to read from the underlying buffer from
45+
* each read through the stream.
46+
*/
47+
export function toReadableStream(
48+
data: Uint8Array | ArrayBuffer,
49+
bytesPerRead = 10240
50+
): ReadableStream<Uint8Array | ArrayBuffer> {
51+
let readFrom = 0;
52+
return new ReadableStream({
53+
start(controller) {},
54+
async pull(controller): Promise<void> {
55+
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
56+
readFrom += bytesPerRead;
57+
if (readFrom >= data.byteLength) {
58+
controller.close();
59+
}
60+
}
61+
});
62+
}
63+
4064
/**
4165
* A class representing a bundle.
4266
*
4367
* Takes a bundle stream or buffer, and presents abstractions to read bundled
4468
* elements out of the underlying content.
4569
*/
4670
export class BundleReader {
47-
// Cached bundle metadata.
48-
private metadata?: BundleMetadata | null;
49-
// The reader to read from underlying binary bundle data source.
50-
private reader: ByteStreamReader;
51-
// Internal buffer to hold bundle content, accumulating incomplete element content.
71+
/** Cached bundle metadata. */
72+
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>();
73+
/** The reader instance of the given ReadableStream. */
74+
private reader: ReadableStreamDefaultReader;
75+
/**
76+
* Internal buffer to hold bundle content, accumulating incomplete element
77+
* content.
78+
*/
5279
private buffer: Uint8Array = new Uint8Array();
80+
/** The decoder used to parse binary data into strings. */
5381
private textDecoder = new TextDecoder('utf-8');
5482

5583
constructor(
@@ -58,59 +86,59 @@ export class BundleReader {
5886
| Uint8Array
5987
| ArrayBuffer
6088
) {
61-
this.reader = PlatformSupport.getPlatform().toByteStreamReader(
62-
bundleStream
89+
if (
90+
bundleStream instanceof Uint8Array ||
91+
bundleStream instanceof ArrayBuffer
92+
) {
93+
this.bundleStream = toReadableStream(bundleStream);
94+
}
95+
this.reader = (this.bundleStream as ReadableStream).getReader();
96+
97+
// Read the metadata (which is the first element).
98+
this.nextElementImpl().then(
99+
element => {
100+
if (element && element.isBundleMetadata()) {
101+
this.metadata.resolve(element.payload.metadata!);
102+
} else {
103+
this.metadata.reject(
104+
new Error(`The first element of the bundle is not a metadata, it is
105+
${JSON.stringify(element?.payload)}`)
106+
);
107+
}
108+
},
109+
error => this.metadata.reject(error)
63110
);
64111
}
65112

66113
/**
67114
* Returns the metadata of the bundle.
68115
*/
69116
async getMetadata(): Promise<BundleMetadata> {
70-
if (!this.metadata) {
71-
await this.nextElement();
72-
}
73-
74-
return this.metadata!;
117+
return this.metadata.promise;
75118
}
76119

77120
/**
78121
* Returns the next BundleElement (together with its byte size in the bundle)
79122
* that has not been read from underlying ReadableStream. Returns null if we
80123
* have reached the end of the stream.
81-
*
82-
* Throws an error if the first element is not a BundleMetadata.
83124
*/
84125
async nextElement(): Promise<SizedBundleElement | null> {
85-
const element = await this.readNextElement();
86-
if (!element) {
87-
return element;
88-
}
89-
90-
if (!this.metadata) {
91-
if (element.isBundleMetadata()) {
92-
this.metadata = element.payload.metadata;
93-
} else {
94-
this.raiseError(
95-
`The first element of the bundle is not a metadata, it is ${JSON.stringify(
96-
element.payload
97-
)}`
98-
);
99-
}
100-
}
101-
102-
return element;
126+
// Makes sure metadata is read before proceeding.
127+
await this.getMetadata();
128+
return this.nextElementImpl();
103129
}
104130

105131
/**
106-
* Reads from the head of internal buffer, and pulling more data from underlying stream if a complete element
107-
* cannot be found, until an element(including the prefixed length and the JSON string) is found.
132+
* Reads from the head of internal buffer, and pulling more data from
133+
* underlying stream if a complete element cannot be found, until an
134+
* element(including the prefixed length and the JSON string) is found.
108135
*
109136
* Once a complete element is read, it is dropped from internal buffer.
110137
*
111-
* Returns either the bundled element, or null if we have reached the end of the stream.
138+
* Returns either the bundled element, or null if we have reached the end of
139+
* the stream.
112140
*/
113-
private async readNextElement(): Promise<SizedBundleElement | null> {
141+
private async nextElementImpl(): Promise<SizedBundleElement | null> {
114142
const lengthBuffer = await this.readLength();
115143
if (lengthBuffer === null) {
116144
return null;
@@ -122,60 +150,72 @@ export class BundleReader {
122150
this.raiseError(`length string (${lengthString}) is not valid number`);
123151
}
124152

125-
const jsonString = await this.readJsonString(lengthBuffer.length, length);
126-
// Update the internal buffer to drop the read length and json string.
127-
this.buffer = this.buffer.slice(lengthBuffer.length + length);
153+
const jsonString = await this.readJsonString(length);
128154

129155
return new SizedBundleElement(
130156
JSON.parse(jsonString),
131157
lengthBuffer.length + length
132158
);
133159
}
134160

135-
// First index of '{' from the underlying buffer.
161+
/** First index of '{' from the underlying buffer. */
136162
private indexOfOpenBracket(): number {
137163
return this.buffer.findIndex(v => v === '{'.charCodeAt(0));
138164
}
139165

140-
// Reads from the beginning of the internal buffer, until the first '{', and return
141-
// the content.
142-
// If reached end of the stream, returns a null.
166+
/**
167+
* Reads from the beginning of the internal buffer, until the first '{', and
168+
* return the content.
169+
*
170+
* If reached end of the stream, returns a null.
171+
*/
143172
private async readLength(): Promise<Uint8Array | null> {
144-
let position = this.indexOfOpenBracket();
145-
while (position < 0) {
146-
const bytesRead = await this.pullMoreDataToBuffer();
147-
if (bytesRead < 0) {
148-
if (this.buffer.length === 0) {
149-
return null;
150-
}
151-
position = this.indexOfOpenBracket();
152-
// Underlying stream is closed, and we still cannot find a '{'.
153-
if (position < 0) {
154-
this.raiseError(
155-
'Reached the end of bundle when a length string is expected.'
156-
);
157-
}
158-
} else {
159-
position = this.indexOfOpenBracket();
173+
while (this.indexOfOpenBracket() < 0) {
174+
const done = await this.pullMoreDataToBuffer();
175+
if (done) {
176+
break;
160177
}
161178
}
162179

163-
return this.buffer.slice(0, position);
180+
// Broke out of the loop because underlying stream is closed, and there
181+
// happens to be no more data to process.
182+
if (this.buffer.length === 0) {
183+
return null;
184+
}
185+
186+
const position = this.indexOfOpenBracket();
187+
// Broke out of the loop because underlying stream is closed, but still
188+
// cannot find an open bracket.
189+
if (position < 0) {
190+
this.raiseError(
191+
'Reached the end of bundle when a length string is expected.'
192+
);
193+
}
194+
195+
const result = this.buffer.slice(0, position);
196+
// Update the internal buffer to drop the read length.
197+
this.buffer = this.buffer.slice(position);
198+
return result;
164199
}
165200

166-
// Reads from a specified position from the internal buffer, for a specified
167-
// number of bytes, pulling more data from the underlying stream if needed.
168-
//
169-
// Returns a string decoded from the read bytes.
170-
private async readJsonString(start: number, length: number): Promise<string> {
171-
while (this.buffer.length < start + length) {
172-
const bytesRead = await this.pullMoreDataToBuffer();
173-
if (bytesRead < 0) {
201+
/**
202+
* Reads from a specified position from the internal buffer, for a specified
203+
* number of bytes, pulling more data from the underlying stream if needed.
204+
*
205+
* Returns a string decoded from the read bytes.
206+
*/
207+
private async readJsonString(length: number): Promise<string> {
208+
while (this.buffer.length < length) {
209+
const done = await this.pullMoreDataToBuffer();
210+
if (done) {
174211
this.raiseError('Reached the end of bundle when more is expected.');
175212
}
176213
}
177214

178-
return this.textDecoder.decode(this.buffer.slice(start, start + length));
215+
const result = this.textDecoder.decode(this.buffer.slice(0, length));
216+
// Update the internal buffer to drop the read json string.
217+
this.buffer = this.buffer.slice(length);
218+
return result;
179219
}
180220

181221
private raiseError(message: string): void {
@@ -184,20 +224,20 @@ export class BundleReader {
184224
throw new Error(message);
185225
}
186226

187-
// Pulls more data from underlying stream to internal buffer.
188-
// Returns a boolean indicating whether the stream is finished.
189-
private async pullMoreDataToBuffer(): Promise<number> {
227+
/**
228+
* Pulls more data from underlying stream to internal buffer.
229+
* Returns a boolean indicating whether the stream is finished.
230+
*/
231+
private async pullMoreDataToBuffer(): Promise<boolean> {
190232
const result = await this.reader.read();
191-
let bytesRead = -1;
192233
if (!result.done) {
193-
bytesRead = result.value.length;
194234
const newBuffer = new Uint8Array(
195235
this.buffer.length + result.value.length
196236
);
197237
newBuffer.set(this.buffer);
198238
newBuffer.set(result.value, this.buffer.length);
199239
this.buffer = newBuffer;
200240
}
201-
return bytesRead;
241+
return result.done;
202242
}
203243
}

0 commit comments

Comments
 (0)