Skip to content

Commit 4cbe608

Browse files
committed
Merge branch 'wuandy/Bundles' into wuandy/BundleReaderNode
# Conflicts: # packages/firestore/src/util/bundle_reader.ts # packages/firestore/test/unit/util/bundle.test.ts
1 parent 9d6edc5 commit 4cbe608

File tree

3 files changed

+64
-62
lines changed

3 files changed

+64
-62
lines changed

packages/firestore/src/platform/platform.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ export class PlatformSupport {
109109
PlatformSupport.platform = platform;
110110
}
111111

112+
private static _forceSetPlatform(platform: Platform): void {
113+
PlatformSupport.platform = platform;
114+
}
115+
112116
static getPlatform(): Platform {
113117
if (!PlatformSupport.platform) {
114118
fail('Platform not set');

packages/firestore/src/util/bundle_reader.ts

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
BundleMetadata
2121
} from '../protos/firestore_bundle_proto';
2222
import { Deferred } from './promise';
23+
import { ByteStreamReader, PlatformSupport } from '../platform/platform';
2324

2425
/**
2526
* A complete element in the bundle stream, together with the byte length it
@@ -37,30 +38,6 @@ export class SizedBundleElement {
3738
}
3839
}
3940

40-
/**
41-
* Create a `ReadableStream` from a underlying buffer.
42-
*
43-
* @param data: Underlying buffer.
44-
* @param bytesPerRead: How many bytes to read from the underlying buffer from
45-
* each read through the stream.
46-
*/
47-
export function toReadableStream(
48-
data: Uint8Array | ArrayBuffer,
49-
bytesPerRead = 10240
50-
): ReadableStream<Uint8Array | ArrayBuffer> {
51-
let readFrom = 0;
52-
return new ReadableStream({
53-
start(controller) {},
54-
async pull(controller): Promise<void> {
55-
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
56-
readFrom += bytesPerRead;
57-
if (readFrom >= data.byteLength) {
58-
controller.close();
59-
}
60-
}
61-
});
62-
}
63-
6441
/**
6542
* A class representing a bundle.
6643
*
@@ -70,8 +47,8 @@ export function toReadableStream(
7047
export class BundleReader {
7148
/** Cached bundle metadata. */
7249
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>();
73-
/** The reader instance of the given ReadableStream. */
74-
private reader: ReadableStreamDefaultReader;
50+
/** The reader to read from underlying binary bundle data source. */
51+
private reader: ByteStreamReader;
7552
/**
7653
* Internal buffer to hold bundle content, accumulating incomplete element
7754
* content.
@@ -86,13 +63,9 @@ export class BundleReader {
8663
| Uint8Array
8764
| ArrayBuffer
8865
) {
89-
if (
90-
bundleStream instanceof Uint8Array ||
91-
bundleStream instanceof ArrayBuffer
92-
) {
93-
this.bundleStream = toReadableStream(bundleStream);
94-
}
95-
this.reader = (this.bundleStream as ReadableStream).getReader();
66+
this.reader = PlatformSupport.getPlatform().toByteStreamReader(
67+
bundleStream
68+
);
9669

9770
// Read the metadata (which is the first element).
9871
this.nextElementImpl().then(

packages/firestore/test/unit/util/bundle.test.ts

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,38 @@
1717
import { expect } from 'chai';
1818
import {
1919
BundleReader,
20-
SizedBundleElement,
21-
toReadableStream
20+
SizedBundleElement
2221
} from '../../../src/util/bundle_reader';
23-
import { isNode } from '../../util/test_platform';
2422
import { BundleElement } from '../../../src/protos/firestore_bundle_proto';
23+
import { isNode } from '../../util/test_platform';
24+
import {
25+
PlatformSupport,
26+
toByteStreamReader
27+
} from '../../../src/platform/platform';
2528

26-
function readableStreamFromString(
29+
/**
30+
* Create a `ReadableStream` from a string.
31+
*
32+
* @param content: Bundle in string.
33+
* @param bytesPerRead: How many bytes to read from the underlying buffer from
34+
* each read through the stream.
35+
*/
36+
export function readableStreamFromString(
2737
content: string,
28-
bytesPerRead: number
29-
): ReadableStream {
30-
return toReadableStream(new TextEncoder().encode(content), bytesPerRead);
38+
bytesPerRead = 10240
39+
): ReadableStream<Uint8Array | ArrayBuffer> {
40+
let readFrom = 0;
41+
const data = new TextEncoder().encode(content);
42+
return new ReadableStream({
43+
start(controller) {},
44+
async pull(controller): Promise<void> {
45+
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
46+
readFrom += bytesPerRead;
47+
if (readFrom >= data.byteLength) {
48+
controller.close();
49+
}
50+
}
51+
});
3152
}
3253

3354
function lengthPrefixedString(o: {}): string {
@@ -60,14 +81,30 @@ describe('readableStreamFromString()', () => {
6081
});
6182
});
6283

63-
describe('Bundle ', () => {
84+
describe.only('Bundle ', () => {
6485
genericBundleReadingTests(1);
6586
genericBundleReadingTests(4);
6687
genericBundleReadingTests(64);
6788
genericBundleReadingTests(1024);
6889
});
6990

7091
function genericBundleReadingTests(bytesPerRead: number): void {
92+
if (isNode()) {
93+
const platform = PlatformSupport.getPlatform();
94+
platform.toByteStreamReader = source =>
95+
toByteStreamReader(source as Uint8Array, bytesPerRead);
96+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
97+
(PlatformSupport as any)._forceSetPlatform(platform);
98+
}
99+
100+
function bundleFromString(s: string): BundleReader {
101+
if (isNode()) {
102+
return new BundleReader(encoder.encode(s));
103+
} else {
104+
return new BundleReader(readableStreamFromString(s, bytesPerRead));
105+
}
106+
}
107+
71108
const encoder = new TextEncoder();
72109
// Setting up test data.
73110
const meta: BundleElement = {
@@ -197,8 +234,7 @@ function genericBundleReadingTests(bytesPerRead: number): void {
197234
bytesPerRead: number,
198235
validMeta = false
199236
): Promise<void> {
200-
const bundleStream = readableStreamFromString(bundleString, bytesPerRead);
201-
const bundle = new BundleReader(bundleStream);
237+
const bundle = bundleFromString(bundleString);
202238

203239
if (!validMeta) {
204240
await expect(await bundle.getMetadata()).should.be.rejected;
@@ -210,15 +246,13 @@ function genericBundleReadingTests(bytesPerRead: number): void {
210246
}
211247

212248
it('reads with query and doc with bytesPerRead ' + bytesPerRead, async () => {
213-
const bundleStream = readableStreamFromString(
249+
const bundle = bundleFromString(
214250
metaString +
215251
limitQueryString +
216252
limitToLastQueryString +
217253
doc1MetaString +
218-
doc1String,
219-
bytesPerRead
254+
doc1String
220255
);
221-
const bundle = new BundleReader(bundleStream);
222256

223257
expect(await bundle.getMetadata()).to.deep.equal(meta.metadata);
224258

@@ -233,16 +267,14 @@ function genericBundleReadingTests(bytesPerRead: number): void {
233267
it(
234268
'reads with unexpected orders with bytesPerRead ' + bytesPerRead,
235269
async () => {
236-
const bundleStream = readableStreamFromString(
270+
const bundle = bundleFromString(
237271
metaString +
238272
doc1MetaString +
239273
doc1String +
240274
limitQueryString +
241275
doc2MetaString +
242-
doc2String,
243-
bytesPerRead
276+
doc2String
244277
);
245-
const bundle = new BundleReader(bundleStream);
246278

247279
const actual = await getAllElements(bundle);
248280
expect(actual.length).to.equal(5);
@@ -260,11 +292,7 @@ function genericBundleReadingTests(bytesPerRead: number): void {
260292
it(
261293
'reads without named query with bytesPerRead ' + bytesPerRead,
262294
async () => {
263-
const bundleStream = readableStreamFromString(
264-
metaString + doc1MetaString + doc1String,
265-
bytesPerRead
266-
);
267-
const bundle = new BundleReader(bundleStream);
295+
const bundle = bundleFromString(metaString + doc1MetaString + doc1String);
268296

269297
expect(await bundle.getMetadata()).to.deep.equal(meta.metadata);
270298

@@ -276,11 +304,9 @@ function genericBundleReadingTests(bytesPerRead: number): void {
276304
);
277305

278306
it('reads with deleted doc with bytesPerRead ' + bytesPerRead, async () => {
279-
const bundleStream = readableStreamFromString(
280-
metaString + noDocMetaString + doc1MetaString + doc1String,
281-
bytesPerRead
307+
const bundle = bundleFromString(
308+
metaString + noDocMetaString + doc1MetaString + doc1String
282309
);
283-
const bundle = new BundleReader(bundleStream);
284310

285311
expect(await bundle.getMetadata()).to.deep.equal(meta.metadata);
286312

@@ -294,8 +320,7 @@ function genericBundleReadingTests(bytesPerRead: number): void {
294320
it(
295321
'reads without documents or query with bytesPerRead ' + bytesPerRead,
296322
async () => {
297-
const bundleStream = readableStreamFromString(metaString, bytesPerRead);
298-
const bundle = new BundleReader(bundleStream);
323+
const bundle = bundleFromString(metaString);
299324

300325
expect(await bundle.getMetadata()).to.deep.equal(meta.metadata);
301326

0 commit comments

Comments
 (0)