Skip to content

Commit 622763d

Browse files
committed
(wip) adapt download algorithm to use range list
1 parent b6572be commit 622763d

File tree

2 files changed

+127
-101
lines changed

2 files changed

+127
-101
lines changed

packages/hub/src/utils/RangeList.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ export class RangeList<T> {
160160
}
161161

162162
/**
163-
* Get all ranges within the specified boundaries. The boundaries must match existing ones.
163+
* Get all ranges within the specified boundaries.
164164
*/
165165
getRanges(start: number, end: number): Range<T>[] {
166166
if (end <= start) {
@@ -175,13 +175,6 @@ export class RangeList<T> {
175175
}
176176
}
177177

178-
// Verify boundaries match
179-
if (result.length === 0 || result[0].start !== start || result[result.length - 1].end !== end) {
180-
throw new Error("Range boundaries must match existing boundaries");
181-
}
182-
183-
// Todo: also check if there's a gap in the middle but it should not happen with our usage
184-
185178
return result;
186179
}
187180

packages/hub/src/utils/XetBlob.ts

Lines changed: 126 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type { CredentialsParams, RepoDesignation, RepoId } from "../types/public
44
import { checkCredentials } from "./checkCredentials";
55
import { toRepoId } from "./toRepoId";
66
import { decompress as lz4_decompress } from "../vendor/lz4js";
7+
import { RangeList } from "./RangeList";
78

89
const JWT_SAFETY_PERIOD = 60_000;
910
const JWT_CACHE_SIZE = 1_000;
@@ -41,7 +42,11 @@ interface ReconstructionInfo {
4142
url: string;
4243
/** Chunk range */
4344
range: { start: number; end: number };
44-
/** Byte range, when making the call to the URL */
45+
/**
46+
* Byte range, when making the call to the URL.
47+
*
48+
* We assume that we're given non-overlapping ranges for each hash
49+
*/
4550
url_range: { start: number; end: number };
4651
}>
4752
>;
@@ -173,6 +178,22 @@ export class XetBlob extends Blob {
173178
await this.#loadReconstructionInfo();
174179
}
175180

181+
const rangeLists = new Map<string, RangeList<Uint8Array[]>>();
182+
183+
if (!this.reconstructionInfo) {
184+
throw new Error("Failed to load reconstruction info");
185+
}
186+
187+
for (const term of this.reconstructionInfo.terms) {
188+
let rangeList = rangeLists.get(term.hash);
189+
if (!rangeList) {
190+
rangeList = new RangeList<Uint8Array[]>();
191+
rangeLists.set(term.hash, rangeList);
192+
}
193+
194+
rangeList.add(term.range.start, term.range.end);
195+
}
196+
176197
async function* readData(
177198
reconstructionInfo: ReconstructionInfo,
178199
customFetch: typeof fetch,
@@ -187,6 +208,31 @@ export class XetBlob extends Blob {
187208
break;
188209
}
189210

211+
const rangeList = rangeLists.get(term.hash);
212+
if (!rangeList) {
213+
throw new Error(`Failed to find range list for term ${term.hash}`);
214+
}
215+
216+
{
217+
const termRanges = rangeList.getRanges(term.range.start, term.range.end);
218+
219+
if (termRanges.every((range) => range.data)) {
220+
for (const range of termRanges) {
221+
for (let chunk of range.data!) {
222+
if (chunk.length > maxBytes - totalBytesRead) {
223+
chunk = chunk.slice(0, maxBytes - totalBytesRead);
224+
}
225+
totalBytesRead += chunk.length;
226+
// The stream consumer can decide to transfer ownership of the chunk, so we need to return a clone
227+
// if there's more than one range for the same term
228+
yield range.refCount > 1 ? chunk.slice() : chunk;
229+
}
230+
}
231+
rangeList.remove(term.range.start, term.range.end);
232+
continue;
233+
}
234+
}
235+
190236
const fetchInfo = reconstructionInfo.fetch_info[term.hash].find(
191237
(info) => info.range.start <= term.range.start && info.range.end >= term.range.end
192238
);
@@ -223,125 +269,112 @@ export class XetBlob extends Blob {
223269
}
224270

225271
let done = false;
226-
let chunksToSkip = term.range.start - fetchInfo.range.start;
227-
let chunksToRead = term.range.end - term.range.start;
228-
let bytesToSkip = 0;
272+
let chunkIndex = fetchInfo.range.start;
273+
const ranges = rangeList.getRanges(fetchInfo.range.start, fetchInfo.range.end);
229274

230275
let leftoverBytes: Uint8Array | undefined = undefined;
231276

232-
readChunks: while (!done && totalBytesRead < maxBytes) {
277+
while (!done) {
233278
const result = await reader.read();
234279
done = result.done;
235-
if (result.value) {
236-
while (totalBytesRead < maxBytes && chunksToRead) {
237-
if (bytesToSkip) {
238-
if (bytesToSkip >= result.value.length) {
239-
bytesToSkip -= result.value.length;
240-
continue readChunks;
241-
}
242-
result.value = result.value.slice(bytesToSkip);
243-
bytesToSkip = 0;
244-
}
245-
if (leftoverBytes) {
246-
result.value = new Uint8Array([...leftoverBytes, ...result.value]);
247-
leftoverBytes = undefined;
248-
}
249280

250-
if (result.value.length < 8) {
251-
// We need 8 bytes to parse the chunk header
252-
leftoverBytes = result.value;
253-
continue readChunks;
254-
}
281+
if (!result.value) {
282+
continue;
283+
}
255284

256-
const header = new DataView(result.value.buffer, result.value.byteOffset, CHUNK_HEADER_BYTES);
257-
const chunkHeader: ChunkHeader = {
258-
version: header.getUint8(0),
259-
compressed_length: header.getUint8(1) | (header.getUint8(2) << 8) | (header.getUint8(3) << 16),
260-
compression_scheme: header.getUint8(4),
261-
uncompressed_length: header.getUint8(5) | (header.getUint8(6) << 8) | (header.getUint8(7) << 16),
262-
};
285+
if (leftoverBytes) {
286+
result.value = new Uint8Array([...leftoverBytes, ...result.value]);
287+
leftoverBytes = undefined;
288+
}
263289

264-
if (chunkHeader.version !== 0) {
265-
throw new Error(`Unsupported chunk version ${chunkHeader.version}`);
266-
}
290+
if (result.value.length < 8) {
291+
// We need 8 bytes to parse the chunk header
292+
leftoverBytes = result.value;
293+
continue;
294+
}
267295

268-
if (
269-
chunkHeader.compression_scheme !== CompressionScheme.None &&
270-
chunkHeader.compression_scheme !== CompressionScheme.LZ4 &&
271-
chunkHeader.compression_scheme !== CompressionScheme.ByteGroupingLZ4
272-
) {
273-
throw new Error(
274-
`Unsupported compression scheme ${
275-
compressionSchemeLabels[chunkHeader.compression_scheme] ?? chunkHeader.compression_scheme
276-
}`
277-
);
278-
}
296+
const header = new DataView(result.value.buffer, result.value.byteOffset, CHUNK_HEADER_BYTES);
297+
const chunkHeader: ChunkHeader = {
298+
version: header.getUint8(0),
299+
compressed_length: header.getUint8(1) | (header.getUint8(2) << 8) | (header.getUint8(3) << 16),
300+
compression_scheme: header.getUint8(4),
301+
uncompressed_length: header.getUint8(5) | (header.getUint8(6) << 8) | (header.getUint8(7) << 16),
302+
};
279303

280-
if (chunksToSkip) {
281-
chunksToSkip--;
282-
result.value = result.value.slice(CHUNK_HEADER_BYTES);
283-
bytesToSkip = chunkHeader.compressed_length;
284-
continue;
285-
}
304+
if (chunkHeader.version !== 0) {
305+
throw new Error(`Unsupported chunk version ${chunkHeader.version}`);
306+
}
286307

287-
if (readBytesToSkip >= chunkHeader.uncompressed_length) {
288-
readBytesToSkip -= chunkHeader.uncompressed_length;
289-
result.value = result.value.slice(CHUNK_HEADER_BYTES);
290-
bytesToSkip = chunkHeader.compressed_length;
291-
chunksToRead--;
292-
continue;
293-
}
308+
if (
309+
chunkHeader.compression_scheme !== CompressionScheme.None &&
310+
chunkHeader.compression_scheme !== CompressionScheme.LZ4 &&
311+
chunkHeader.compression_scheme !== CompressionScheme.ByteGroupingLZ4
312+
) {
313+
throw new Error(
314+
`Unsupported compression scheme ${
315+
compressionSchemeLabels[chunkHeader.compression_scheme] ?? chunkHeader.compression_scheme
316+
}`
317+
);
318+
}
294319

295-
if (result.value.length < chunkHeader.compressed_length + CHUNK_HEADER_BYTES) {
296-
// We need more data to read the full chunk
297-
leftoverBytes = result.value;
298-
continue readChunks;
299-
}
320+
if (result.value.length < chunkHeader.compressed_length + CHUNK_HEADER_BYTES) {
321+
// We need more data to read the full chunk
322+
leftoverBytes = result.value;
323+
continue;
324+
}
300325

301-
result.value = result.value.slice(CHUNK_HEADER_BYTES);
326+
result.value = result.value.slice(CHUNK_HEADER_BYTES);
302327

303-
const uncompressed =
304-
chunkHeader.compression_scheme === CompressionScheme.LZ4
305-
? lz4_decompress(
328+
let uncompressed =
329+
chunkHeader.compression_scheme === CompressionScheme.LZ4
330+
? lz4_decompress(result.value.slice(0, chunkHeader.compressed_length), chunkHeader.uncompressed_length)
331+
: chunkHeader.compression_scheme === CompressionScheme.ByteGroupingLZ4
332+
? bg4_regoup_bytes(
333+
lz4_decompress(
306334
result.value.slice(0, chunkHeader.compressed_length),
307335
chunkHeader.uncompressed_length
308-
)
309-
: chunkHeader.compression_scheme === CompressionScheme.ByteGroupingLZ4
310-
? bg4_regoup_bytes(
311-
lz4_decompress(
312-
result.value.slice(0, chunkHeader.compressed_length),
313-
chunkHeader.uncompressed_length
314-
)
315-
)
316-
: result.value.slice(0, chunkHeader.compressed_length);
317-
318-
let bytesToYield: Uint8Array;
319-
if (readBytesToSkip) {
320-
const remainingBytes = Math.min(uncompressed.length - readBytesToSkip, maxBytes - totalBytesRead);
321-
bytesToYield = uncompressed.slice(readBytesToSkip, readBytesToSkip + remainingBytes);
322-
readBytesToSkip = 0;
323-
} else {
324-
bytesToYield = uncompressed.slice(0, Math.min(uncompressed.length, maxBytes - totalBytesRead));
325-
}
336+
)
337+
)
338+
: result.value.slice(0, chunkHeader.compressed_length);
339+
340+
const range = ranges.find((range) => chunkIndex >= range.start && chunkIndex < range.end);
341+
const shouldYield = chunkIndex >= term.range.start && chunkIndex < term.range.end;
342+
const minRefCountToStore = shouldYield ? 2 : 1;
343+
let stored = false;
344+
345+
// Assuming non-overlapping fetch_info ranges for the same hash
346+
if (range && range.refCount >= minRefCountToStore) {
347+
range.data ??= [];
348+
range.data.push(uncompressed);
349+
stored = true;
350+
}
326351

327-
totalBytesRead += bytesToYield.length;
328-
yield bytesToYield;
329-
chunksToRead--;
352+
if (shouldYield) {
353+
if (readBytesToSkip) {
354+
const skipped = Math.min(readBytesToSkip, uncompressed.length);
355+
uncompressed = uncompressed.slice(readBytesToSkip);
356+
readBytesToSkip -= skipped;
357+
}
358+
359+
if (uncompressed.length > maxBytes - totalBytesRead) {
360+
uncompressed = uncompressed.slice(0, maxBytes - totalBytesRead);
361+
}
330362

331-
result.value = result.value.slice(chunkHeader.compressed_length);
363+
if (uncompressed.length) {
364+
totalBytesRead += uncompressed.length;
365+
yield stored ? uncompressed.slice() : uncompressed;
332366
}
333367
}
368+
369+
chunkIndex++;
370+
result.value = result.value.slice(chunkHeader.compressed_length);
334371
}
335372

336373
// Release the reader
337374
await reader.cancel();
338375
}
339376
}
340377

341-
if (!this.reconstructionInfo) {
342-
throw new Error("Failed to load reconstruction info");
343-
}
344-
345378
const iterator = readData(
346379
this.reconstructionInfo,
347380
this.fetch,

0 commit comments

Comments
 (0)