Skip to content

Commit 5166f33

Browse files
authored
fix: Improve retry mechanisms for split pdf logic (#108)
1 parent f8a3dbe commit 5166f33

File tree

4 files changed

+123
-19
lines changed

4 files changed

+123
-19
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ check:
3434
## test-unit: run unit tests
3535
.PHONY: test-unit
3636
test-unit:
37-
npx jest --detectOpenHandles --config jest.config.js test/unit
37+
npx jest --verbose --detectOpenHandles --config jest.config.js test/unit
3838

3939
## test-integration: run integration tests
4040
.PHONY: test-integration
4141
test-integration:
42-
npx jest --detectOpenHandles --config jest.config.js test/integration
42+
npx jest --verbose --detectOpenHandles --config jest.config.js test/integration --forceExit
4343

4444
## test: run all tests
4545
.PHONY: test

src/hooks/custom/SplitPdfHook.ts

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import async from "async";
22

3-
import { HTTPClient } from "../../lib/http.js";
43
import {
54
AfterErrorContext,
65
AfterErrorHook,
@@ -25,10 +24,12 @@ import {
2524
stringToBoolean,
2625
} from "./utils/index.js";
2726
import {
27+
HTTPClientExtension,
2828
MIN_PAGES_PER_THREAD,
2929
PARTITION_FORM_FILES_KEY,
3030
PARTITION_FORM_SPLIT_PDF_PAGE_KEY,
3131
} from "./common.js";
32+
import {retry, RetryConfig} from "../../lib/retries";
3233

3334
/**
3435
* Represents a hook for splitting and sending PDF files as per page requests.
@@ -39,8 +40,7 @@ export class SplitPdfHook
3940
/**
4041
* The HTTP client used for making requests.
4142
*/
42-
client: HTTPClient | undefined;
43-
43+
client: HTTPClientExtension | undefined;
4444

4545
/**
4646
* Keeps the strict-mode setting for splitPdfPage feature.
@@ -68,9 +68,16 @@ export class SplitPdfHook
6868
* @returns The initialized SDK options.
6969
*/
7070
sdkInit(opts: SDKInitOptions): SDKInitOptions {
71-
const { baseURL, client } = opts;
72-
this.client = client;
73-
return { baseURL: baseURL, client: client };
71+
const { baseURL } = opts;
72+
this.client = new HTTPClientExtension();
73+
74+
this.client.addHook("response", (res) => {
75+
if (res.status != 200) {
76+
console.error("Request failed with status code", `${res.status}`);
77+
}
78+
});
79+
80+
return { baseURL: baseURL, client: this.client };
7481
}
7582

7683
/**
@@ -178,23 +185,48 @@ export class SplitPdfHook
178185
file.name,
179186
firstPageNumber
180187
);
188+
const timeoutInMs = 60 * 10 * 1000;
181189
const req = new Request(requestClone, {
182190
headers,
183191
body,
192+
signal: AbortSignal.timeout(timeoutInMs)
184193
});
185194
requests.push(req);
186195
setIndex+=1;
187196
}
188197

189198
this.partitionSuccessfulResponses[operationID] = new Array(requests.length);
199+
this.partitionFailedResponses[operationID] = new Array(requests.length);
190200

191201
const allowFailed = this.allowFailed;
192202

203+
// These are the retry values from our api spec
204+
// We need to hardcode them here until we're able to reuse the SDK
205+
// from within this hook
206+
const oneSecond = 1000;
207+
const oneMinute = 1000 * 60;
208+
const retryConfig = {
209+
strategy: "backoff",
210+
backoff: {
211+
initialInterval: oneSecond * 3,
212+
maxInterval: oneMinute * 12,
213+
exponent: 1.88,
214+
maxElapsedTime: oneMinute * 30,
215+
},
216+
} as RetryConfig;
217+
218+
const retryCodes = ["502", "503", "504"];
219+
193220
this.partitionRequests[operationID] = async.parallelLimit(
194-
requests.slice(0, -1).map((req, pageIndex) => async () => {
221+
requests.map((req, pageIndex) => async () => {
195222
const pageNumber = pageIndex + startingPageNumber;
196223
try {
197-
const response = await this.client!.request(req);
224+
const response = await retry(
225+
async () => {
226+
return await this.client!.request(req.clone());
227+
},
228+
{ config: retryConfig, statusCodes: retryCodes }
229+
);
198230
if (response.status === 200) {
199231
(this.partitionSuccessfulResponses[operationID] as Response[])[pageIndex] =
200232
response.clone();
@@ -206,7 +238,7 @@ export class SplitPdfHook
206238
}
207239
}
208240
} catch (e) {
209-
console.error(`Failed to send request for page ${pageNumber}.`);
241+
console.error(`Failed to send request for page ${pageNumber}.`, e);
210242
if (!allowFailed) {
211243
throw e;
212244
}
@@ -215,7 +247,8 @@ export class SplitPdfHook
215247
concurrencyLevel
216248
);
217249

218-
return requests.at(-1) as Request;
250+
const dummyRequest = new Request("https://no-op/");
251+
return dummyRequest;
219252
}
220253

221254
/**
@@ -230,28 +263,39 @@ export class SplitPdfHook
230263
successfulResponses: Response[],
231264
failedResponses: Response[]
232265
): Promise<Response> {
266+
let realResponse = response.clone();
267+
const firstSuccessfulResponse = successfulResponses.at(0);
268+
const isFakeResponse = response.headers.has("fake-response");
269+
if (firstSuccessfulResponse !== undefined && isFakeResponse) {
270+
realResponse = firstSuccessfulResponse.clone();
271+
}
272+
233273
let responseBody, responseStatus, responseStatusText;
234274
const numFailedResponses = failedResponses?.length ?? 0;
235-
const headers = prepareResponseHeaders(response);
275+
const headers = prepareResponseHeaders(realResponse);
236276

237277
if (!this.allowFailed && failedResponses && failedResponses.length > 0) {
238278
const failedResponse = failedResponses[0]?.clone();
239279
if (failedResponse) {
240280
responseBody = await failedResponse.text();
241-
responseStatus = failedResponse.status;
242281
responseStatusText = failedResponse.statusText;
243282
} else {
244283
responseBody = JSON.stringify({"details:": "Unknown error"});
245-
responseStatus = 503
246284
responseStatusText = "Unknown error"
247285
}
286+
// if the response status is unknown or was 502, 503, 504, set back to 500 to ensure we don't cause more retries
287+
responseStatus = 500;
248288
console.warn(
249289
`${numFailedResponses} requests failed. The partition operation is cancelled.`
250290
);
251291
} else {
252-
responseBody = await prepareResponseBody([...successfulResponses, response]);
253-
responseStatus = response.status
254-
responseStatusText = response.statusText
292+
if (isFakeResponse) {
293+
responseBody = await prepareResponseBody([...successfulResponses]);
294+
} else {
295+
responseBody = await prepareResponseBody([...successfulResponses, response]);
296+
}
297+
responseStatus = realResponse.status
298+
responseStatusText = realResponse.statusText
255299
if (numFailedResponses > 0) {
256300
console.warn(
257301
`${numFailedResponses} requests failed. The results might miss some pages.`

src/hooks/custom/common.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import {HTTPClient} from "../../lib/http";
2+
13
/**
24
* Regular expression pattern for matching base hostnames in the form of "*.unstructuredapp.io".
35
*/
@@ -23,3 +25,22 @@ export const MAX_NUMBER_OF_PARALLEL_REQUESTS = 15;
2325

2426
export const MIN_PAGES_PER_THREAD = 2;
2527
export const MAX_PAGES_PER_THREAD = 20;
28+
29+
export class HTTPClientExtension extends HTTPClient {
30+
constructor() {
31+
super();
32+
}
33+
34+
override async request(request: Request): Promise<Response> {
35+
if (request.url === "https://no-op/") {
36+
return new Response('{}', {
37+
headers: [
38+
["fake-response", "fake-response"]
39+
],
40+
status: 200,
41+
statusText: 'OK_NO_OP'
42+
});
43+
}
44+
return super.request(request);
45+
}
46+
}

test/integration/SplitPdfHook.test.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { UnstructuredClient } from "../../src";
44
import { PartitionResponse } from "../../src/sdk/models/operations";
55
import { PartitionParameters, Strategy } from "../../src/sdk/models/shared";
66

7-
const localServer = "http://localhost:8000"
7+
const localServer = "http://localhost:8000";
88

99
describe("SplitPdfHook integration tests check splitted file is same as not splitted", () => {
1010
const FAKE_API_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
@@ -354,3 +354,42 @@ describe("SplitPdfHook integration tests page range parameter", () => {
354354
300000
355355
);
356356
});
357+
358+
359+
describe("SplitPDF succeeds for large PDF with high concurrency", () => {
360+
const FAKE_API_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
361+
362+
it.each([
363+
`${localServer}/general/v0/general`,
364+
])("succeed", async (serverURL) => {
365+
const client = new UnstructuredClient({
366+
serverURL: serverURL,
367+
security: {
368+
apiKeyAuth: FAKE_API_KEY,
369+
},
370+
});
371+
372+
const file = {
373+
content: readFileSync("test/data/layout-parser-paper.pdf"),
374+
fileName: "test/data/layout-parser-paper.pdf"
375+
};
376+
377+
const requestParams: PartitionParameters = {
378+
files: file,
379+
splitPdfPage: true,
380+
strategy: Strategy.HiRes,
381+
splitPdfAllowFailed: false,
382+
splitPdfConcurrencyLevel: 15
383+
};
384+
385+
const res: PartitionResponse = await client.general.partition({
386+
partitionParameters: {
387+
...requestParams
388+
},
389+
});
390+
391+
expect(res.statusCode).toEqual(200);
392+
expect(res.elements?.length).toBeGreaterThan(0);
393+
},
394+
300000);
395+
});

0 commit comments

Comments
 (0)