Skip to content

fix: Improve retry mechanisms for split pdf logic #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ check:
## test-unit: run unit tests
.PHONY: test-unit
test-unit:
npx jest --detectOpenHandles --config jest.config.js test/unit
npx jest --verbose --detectOpenHandles --config jest.config.js test/unit

## test-integration: run integration tests
.PHONY: test-integration
test-integration:
npx jest --detectOpenHandles --config jest.config.js test/integration
npx jest --verbose --detectOpenHandles --config jest.config.js test/integration --forceExit

## test: run all tests
.PHONY: test
Expand Down
76 changes: 60 additions & 16 deletions src/hooks/custom/SplitPdfHook.ts
Copy link

@dennison33 dennison33 Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on line 213 wont this assignment fail:
(this.partitionSuccessfulResponses[operationID] as Response[])[pageIndex][pageIndex] =
response.clone();
because this.partitionSuccessfulResponses[operationID] has not been initialized to an array?

Have a look at line 192, the success responses is initialized, but the fail responses is not:
this.partitionSuccessfulResponses[operationID] = new Array(requests.length);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Thanks

Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import async from "async";

import { HTTPClient } from "../../lib/http.js";
import {
AfterErrorContext,
AfterErrorHook,
Expand All @@ -25,10 +24,12 @@ import {
stringToBoolean,
} from "./utils/index.js";
import {
HTTPClientExtension,
MIN_PAGES_PER_THREAD,
PARTITION_FORM_FILES_KEY,
PARTITION_FORM_SPLIT_PDF_PAGE_KEY,
} from "./common.js";
import {retry, RetryConfig} from "../../lib/retries";

/**
* Represents a hook for splitting and sending PDF files as per page requests.
Expand All @@ -39,8 +40,7 @@ export class SplitPdfHook
/**
* The HTTP client used for making requests.
*/
client: HTTPClient | undefined;

client: HTTPClientExtension | undefined;

/**
* Keeps the strict-mode setting for splitPdfPage feature.
Expand Down Expand Up @@ -68,9 +68,16 @@ export class SplitPdfHook
* @returns The initialized SDK options.
*/
sdkInit(opts: SDKInitOptions): SDKInitOptions {
const { baseURL, client } = opts;
this.client = client;
return { baseURL: baseURL, client: client };
const { baseURL } = opts;
this.client = new HTTPClientExtension();

this.client.addHook("response", (res) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could move this into the HTTPClientExtension class? not necessary though

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me!

if (res.status != 200) {
console.error("Request failed with status code", `${res.status}`);
}
});

return { baseURL: baseURL, client: this.client };
}

/**
Expand Down Expand Up @@ -178,23 +185,48 @@ export class SplitPdfHook
file.name,
firstPageNumber
);
const timeoutInMs = 60 * 10 * 1000;
const req = new Request(requestClone, {
headers,
body,
signal: AbortSignal.timeout(timeoutInMs)
});
requests.push(req);
setIndex+=1;
}

this.partitionSuccessfulResponses[operationID] = new Array(requests.length);
this.partitionFailedResponses[operationID] = new Array(requests.length);

const allowFailed = this.allowFailed;

// These are the retry values from our api spec
// We need to hardcode them here until we're able to reuse the SDK
// from within this hook
const oneSecond = 1000;
const oneMinute = 1000 * 60;
const retryConfig = {
strategy: "backoff",
backoff: {
initialInterval: oneSecond * 3,
maxInterval: oneMinute * 12,
exponent: 1.88,
maxElapsedTime: oneMinute * 30,
},
} as RetryConfig;

const retryCodes = ["502", "503", "504"];

this.partitionRequests[operationID] = async.parallelLimit(
requests.slice(0, -1).map((req, pageIndex) => async () => {
requests.map((req, pageIndex) => async () => {
const pageNumber = pageIndex + startingPageNumber;
try {
const response = await this.client!.request(req);
const response = await retry(
async () => {
return await this.client!.request(req.clone());
},
{ config: retryConfig, statusCodes: retryCodes }
);
if (response.status === 200) {
(this.partitionSuccessfulResponses[operationID] as Response[])[pageIndex] =
response.clone();
Expand All @@ -206,7 +238,7 @@ export class SplitPdfHook
}
}
} catch (e) {
console.error(`Failed to send request for page ${pageNumber}.`);
console.error(`Failed to send request for page ${pageNumber}.`, e);
if (!allowFailed) {
throw e;
}
Expand All @@ -215,7 +247,8 @@ export class SplitPdfHook
concurrencyLevel
);

return requests.at(-1) as Request;
const dummyRequest = new Request("https://no-op/");
return dummyRequest;
}

/**
Expand All @@ -230,28 +263,39 @@ export class SplitPdfHook
successfulResponses: Response[],
failedResponses: Response[]
): Promise<Response> {
let realResponse = response.clone();
const firstSuccessfulResponse = successfulResponses.at(0);
const isFakeResponse = response.headers.has("fake-response");
if (firstSuccessfulResponse !== undefined && isFakeResponse) {
realResponse = firstSuccessfulResponse.clone();
}

let responseBody, responseStatus, responseStatusText;
const numFailedResponses = failedResponses?.length ?? 0;
const headers = prepareResponseHeaders(response);
const headers = prepareResponseHeaders(realResponse);

if (!this.allowFailed && failedResponses && failedResponses.length > 0) {
const failedResponse = failedResponses[0]?.clone();
if (failedResponse) {
responseBody = await failedResponse.text();
responseStatus = failedResponse.status;
responseStatusText = failedResponse.statusText;
} else {
responseBody = JSON.stringify({"details:": "Unknown error"});
responseStatus = 503
responseStatusText = "Unknown error"
}
// if the response status is unknown or was 502, 503, 504, set back to 500 to ensure we don't cause more retries
responseStatus = 500;
console.warn(
`${numFailedResponses} requests failed. The partition operation is cancelled.`
);
} else {
responseBody = await prepareResponseBody([...successfulResponses, response]);
responseStatus = response.status
responseStatusText = response.statusText
if (isFakeResponse) {
responseBody = await prepareResponseBody([...successfulResponses]);
} else {
responseBody = await prepareResponseBody([...successfulResponses, response]);
}
responseStatus = realResponse.status
responseStatusText = realResponse.statusText
if (numFailedResponses > 0) {
console.warn(
`${numFailedResponses} requests failed. The results might miss some pages.`
Expand Down
21 changes: 21 additions & 0 deletions src/hooks/custom/common.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import {HTTPClient} from "../../lib/http";

/**
* Regular expression pattern for matching base hostnames in the form of "*.unstructuredapp.io".
*/
Expand All @@ -23,3 +25,22 @@ export const MAX_NUMBER_OF_PARALLEL_REQUESTS = 15;

export const MIN_PAGES_PER_THREAD = 2;
export const MAX_PAGES_PER_THREAD = 20;

export class HTTPClientExtension extends HTTPClient {
constructor() {
super();
}

override async request(request: Request): Promise<Response> {
if (request.url === "https://no-op/") {
return new Response('{}', {
headers: [
["fake-response", "fake-response"]
],
status: 200,
statusText: 'OK_NO_OP'
});
}
return super.request(request);
}
}
41 changes: 40 additions & 1 deletion test/integration/SplitPdfHook.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { UnstructuredClient } from "../../src";
import { PartitionResponse } from "../../src/sdk/models/operations";
import { PartitionParameters, Strategy } from "../../src/sdk/models/shared";

const localServer = "http://localhost:8000"
const localServer = "http://localhost:8000";

describe("SplitPdfHook integration tests check splitted file is same as not splitted", () => {
const FAKE_API_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
Expand Down Expand Up @@ -354,3 +354,42 @@ describe("SplitPdfHook integration tests page range parameter", () => {
300000
);
});


describe("SplitPDF succeeds for large PDF with high concurrency", () => {
const FAKE_API_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";

it.each([
`${localServer}/general/v0/general`,
])("succeed", async (serverURL) => {
const client = new UnstructuredClient({
serverURL: serverURL,
security: {
apiKeyAuth: FAKE_API_KEY,
},
});

const file = {
content: readFileSync("test/data/layout-parser-paper.pdf"),
fileName: "test/data/layout-parser-paper.pdf"
};

const requestParams: PartitionParameters = {
files: file,
splitPdfPage: true,
strategy: Strategy.HiRes,
splitPdfAllowFailed: false,
splitPdfConcurrencyLevel: 15
};

const res: PartitionResponse = await client.general.partition({
partitionParameters: {
...requestParams
},
});

expect(res.statusCode).toEqual(200);
expect(res.elements?.length).toBeGreaterThan(0);
},
300000);
});
Loading