Skip to content

fix: Allow split page logic to process files concurrently #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## 0.26.0-dev

### Enhancements

### Features
* Add `partition_async` for a non blocking alternative to `partition`

### Fixes
65 changes: 65 additions & 0 deletions _test_unstructured_client/integration/test_integration_freemium.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path

import pytest
from deepdiff import DeepDiff
from unstructured_client import UnstructuredClient
from unstructured_client.models import shared, operations
from unstructured_client.models.errors import SDKError, ServerError, HTTPValidationError
Expand Down Expand Up @@ -125,6 +126,70 @@ async def test_partition_async_returns_elements(client, doc_path):
assert len(response.elements)


@pytest.mark.asyncio
async def test_partition_async_processes_concurrent_files(client, doc_path):
"""
Assert that partition_async can be used to send multiple files concurrently.
Send two separate portions of the test doc, serially and then using asyncio.gather.
The results for both runs should match.
"""
filename = "layout-parser-paper.pdf"

with open(doc_path / filename, "rb") as f:
files = shared.Files(
content=f.read(),
file_name=filename,
)

# Set up two SDK requests
# For different page ranges
requests = [
operations.PartitionRequest(
partition_parameters=shared.PartitionParameters(
files=files,
strategy="fast",
languages=["eng"],
split_pdf_page=True,
split_pdf_page_range=[1, 3],
)
),
operations.PartitionRequest(
partition_parameters=shared.PartitionParameters(
files=files,
strategy="fast",
languages=["eng"],
split_pdf_page=True,
split_pdf_page_range=[10, 12],
)
)
]

serial_responses = []
for req in requests:
res = await client.general.partition_async(request=req)

assert res.status_code == 200
serial_responses.append(res.elements)

concurrent_responses = []
results = await asyncio.gather(
client.general.partition_async(request=requests[0]),
client.general.partition_async(request=requests[1])
)

for res in results:
assert res.status_code == 200
concurrent_responses.append(res.elements)

diff = DeepDiff(
t1=serial_responses,
t2=concurrent_responses,
ignore_order=True,
)

assert len(diff) == 0


def test_uvloop_partitions_without_errors(client, doc_path):
async def call_api():
filename = "layout-parser-paper-fast.pdf"
Expand Down
24 changes: 18 additions & 6 deletions src/unstructured_client/_hooks/custom/split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import math
import uuid
from collections.abc import Awaitable
from typing import Any, Coroutine, Optional, Tuple, Union, cast

Expand Down Expand Up @@ -198,7 +199,12 @@ def before_request(
# This allows us to use an event loop in an env with an existing loop
# Temporary fix until we can improve the async splitting behavior
nest_asyncio.apply()
operation_id = hook_ctx.operation_id

# This is our key into coroutines_to_execute
# We need to pass it on to after_success so
# we know which results are ours
operation_id = str(uuid.uuid4())

content_type = request.headers.get("Content-Type")

request_content = request.read()
Expand Down Expand Up @@ -329,14 +335,20 @@ async def call_api_partial(page):
self.coroutines_to_execute[operation_id].append(coroutine)
set_index += 1


# Return a dummy request for the SDK to use
# This allows us to skip right to the AfterRequestHook and await all the calls
# Also, pass the operation_id so after_success can await the right results

# Note: We need access to the async_client from the sdk_init hook in order to set
# up a mock request like this.
# For now, just make an extra request against our api, which should return 200.
# dummy_request = httpx.Request("GET", "http://no-op")

dummy_request = httpx.Request("GET", "https://api.unstructuredapp.io/general/docs")
dummy_request = httpx.Request(
"GET",
"https://api.unstructuredapp.io/general/docs",
headers={"request_id": operation_id},
)

return dummy_request

Expand Down Expand Up @@ -407,9 +419,9 @@ def after_success(
combined response object; otherwise, the original response. Can return
exception if it ocurred during the execution.
"""
operation_id = hook_ctx.operation_id
# Because in `before_request` method we skipped sending last page in parallel
# we need to pass response, which contains last page, to `_await_elements` method
# Grab the correct id out of the dummy request
operation_id = response.request.headers.get("request_id")

elements = self._await_elements(operation_id)

# if fails are disallowed, return the first failed response
Expand Down