-
Notifications
You must be signed in to change notification settings - Fork 946
Implement a bundle reader for Web #3097
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
9602712
Renaming interfaces without leading I
wu-hui c5e783e
Initial commit of bundle reading - for web only.
wu-hui 5e7fb89
Tests only run when it is not Node.
wu-hui 1ee1615
Fix redundant imports
wu-hui 18f0be1
Fix missing textencoder
wu-hui aa455bf
Remove generator.
wu-hui 782e76f
Address comments
wu-hui f5e4474
Address more feedback.
wu-hui c60f8dc
Race condition.
wu-hui e005485
Wait for metadata to be read before move on to rest elements.
wu-hui 6d9d5cd
Remove only.
wu-hui a869466
Fix payload printing.
wu-hui File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
/** | ||
* @license | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
import { | ||
BundleElement, | ||
BundleMetadata | ||
} from '../protos/firestore_bundle_proto'; | ||
import { Deferred } from './promise'; | ||
|
||
/** | ||
* A complete element in the bundle stream, together with the byte length it | ||
* occupies in the stream. | ||
*/ | ||
export class SizedBundleElement { | ||
constructor( | ||
public readonly payload: BundleElement, | ||
// How many bytes this element takes to store in the bundle. | ||
public readonly byteLength: number | ||
) {} | ||
|
||
isBundleMetadata(): boolean { | ||
return 'metadata' in this.payload; | ||
} | ||
} | ||
|
||
/** | ||
* Create a `ReadableStream` from a underlying buffer. | ||
* | ||
* @param data: Underlying buffer. | ||
* @param bytesPerRead: How many bytes to read from the underlying buffer from | ||
* each read through the stream. | ||
*/ | ||
export function toReadableStream( | ||
data: Uint8Array | ArrayBuffer, | ||
bytesPerRead = 10240 | ||
): ReadableStream<Uint8Array | ArrayBuffer> { | ||
let readFrom = 0; | ||
return new ReadableStream({ | ||
start(controller) {}, | ||
async pull(controller): Promise<void> { | ||
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead)); | ||
readFrom += bytesPerRead; | ||
if (readFrom >= data.byteLength) { | ||
controller.close(); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* A class representing a bundle. | ||
* | ||
* Takes a bundle stream or buffer, and presents abstractions to read bundled | ||
* elements out of the underlying content. | ||
*/ | ||
export class BundleReader { | ||
/** Cached bundle metadata. */ | ||
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>(); | ||
/** The reader instance of the given ReadableStream. */ | ||
private reader: ReadableStreamDefaultReader; | ||
/** | ||
* Internal buffer to hold bundle content, accumulating incomplete element | ||
* content. | ||
*/ | ||
private buffer: Uint8Array = new Uint8Array(); | ||
/** The decoder used to parse binary data into strings. */ | ||
private textDecoder = new TextDecoder('utf-8'); | ||
|
||
constructor( | ||
private bundleStream: | ||
| ReadableStream<Uint8Array | ArrayBuffer> | ||
| Uint8Array | ||
| ArrayBuffer | ||
) { | ||
if ( | ||
bundleStream instanceof Uint8Array || | ||
bundleStream instanceof ArrayBuffer | ||
) { | ||
this.bundleStream = toReadableStream(bundleStream); | ||
} | ||
this.reader = (this.bundleStream as ReadableStream).getReader(); | ||
|
||
// Read the metadata (which is the first element). | ||
this.nextElementImpl().then( | ||
element => { | ||
if (element && element.isBundleMetadata()) { | ||
this.metadata.resolve(element.payload.metadata!); | ||
} else { | ||
this.metadata.reject( | ||
new Error(`The first element of the bundle is not a metadata, it is | ||
${JSON.stringify(element?.payload)}`) | ||
); | ||
} | ||
}, | ||
error => this.metadata.reject(error) | ||
); | ||
} | ||
|
||
/** | ||
* Returns the metadata of the bundle. | ||
*/ | ||
async getMetadata(): Promise<BundleMetadata> { | ||
return this.metadata.promise; | ||
} | ||
|
||
/** | ||
* Returns the next BundleElement (together with its byte size in the bundle) | ||
* that has not been read from underlying ReadableStream. Returns null if we | ||
* have reached the end of the stream. | ||
*/ | ||
async nextElement(): Promise<SizedBundleElement | null> { | ||
// Makes sure metadata is read before proceeding. | ||
await this.getMetadata(); | ||
return this.nextElementImpl(); | ||
} | ||
|
||
/** | ||
* Reads from the head of internal buffer, and pulling more data from | ||
* underlying stream if a complete element cannot be found, until an | ||
* element(including the prefixed length and the JSON string) is found. | ||
* | ||
* Once a complete element is read, it is dropped from internal buffer. | ||
* | ||
* Returns either the bundled element, or null if we have reached the end of | ||
* the stream. | ||
*/ | ||
private async nextElementImpl(): Promise<SizedBundleElement | null> { | ||
const lengthBuffer = await this.readLength(); | ||
if (lengthBuffer === null) { | ||
return null; | ||
} | ||
|
||
const lengthString = this.textDecoder.decode(lengthBuffer); | ||
const length = Number(lengthString); | ||
if (isNaN(length)) { | ||
this.raiseError(`length string (${lengthString}) is not valid number`); | ||
} | ||
|
||
const jsonString = await this.readJsonString(length); | ||
|
||
return new SizedBundleElement( | ||
JSON.parse(jsonString), | ||
lengthBuffer.length + length | ||
); | ||
} | ||
|
||
/** First index of '{' from the underlying buffer. */ | ||
private indexOfOpenBracket(): number { | ||
return this.buffer.findIndex(v => v === '{'.charCodeAt(0)); | ||
} | ||
|
||
/** | ||
* Reads from the beginning of the internal buffer, until the first '{', and | ||
* return the content. | ||
* | ||
* If reached end of the stream, returns a null. | ||
*/ | ||
private async readLength(): Promise<Uint8Array | null> { | ||
while (this.indexOfOpenBracket() < 0) { | ||
const done = await this.pullMoreDataToBuffer(); | ||
if (done) { | ||
break; | ||
} | ||
} | ||
|
||
// Broke out of the loop because underlying stream is closed, and there | ||
// happens to be no more data to process. | ||
if (this.buffer.length === 0) { | ||
return null; | ||
} | ||
|
||
const position = this.indexOfOpenBracket(); | ||
// Broke out of the loop because underlying stream is closed, but still | ||
// cannot find an open bracket. | ||
if (position < 0) { | ||
this.raiseError( | ||
'Reached the end of bundle when a length string is expected.' | ||
); | ||
} | ||
|
||
const result = this.buffer.slice(0, position); | ||
// Update the internal buffer to drop the read length. | ||
this.buffer = this.buffer.slice(position); | ||
return result; | ||
} | ||
|
||
/** | ||
* Reads from a specified position from the internal buffer, for a specified | ||
* number of bytes, pulling more data from the underlying stream if needed. | ||
* | ||
* Returns a string decoded from the read bytes. | ||
*/ | ||
private async readJsonString(length: number): Promise<string> { | ||
while (this.buffer.length < length) { | ||
const done = await this.pullMoreDataToBuffer(); | ||
if (done) { | ||
this.raiseError('Reached the end of bundle when more is expected.'); | ||
} | ||
} | ||
|
||
const result = this.textDecoder.decode(this.buffer.slice(0, length)); | ||
// Update the internal buffer to drop the read json string. | ||
this.buffer = this.buffer.slice(length); | ||
return result; | ||
} | ||
|
||
private raiseError(message: string): void { | ||
// eslint-disable-next-line @typescript-eslint/no-floating-promises | ||
this.reader.cancel('Invalid bundle format.'); | ||
throw new Error(message); | ||
} | ||
|
||
/** | ||
* Pulls more data from underlying stream to internal buffer. | ||
* Returns a boolean indicating whether the stream is finished. | ||
*/ | ||
private async pullMoreDataToBuffer(): Promise<boolean> { | ||
const result = await this.reader.read(); | ||
if (!result.done) { | ||
const newBuffer = new Uint8Array( | ||
this.buffer.length + result.value.length | ||
); | ||
newBuffer.set(this.buffer); | ||
newBuffer.set(result.value, this.buffer.length); | ||
this.buffer = newBuffer; | ||
} | ||
return result.done; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSDoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.