Skip to content

Allow upserting data via bulk transaction requests #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 30, 2023
Merged
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased]

- Add `method` parameter to Bulk Transactions requests in order to support upserting bulk data ([#64](https://github.com/stac-utils/stac-fastapi-pgstac/pull/64))

## [2.4.10] - 2023-08-18

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
"orjson",
"pydantic[dotenv]>=1.10.8", # https://github.com/pydantic/pydantic/issues/5821
"stac_pydantic==2.0.*",
"stac-fastapi.types~=2.4.8",
"stac-fastapi.api~=2.4.8",
"stac-fastapi.extensions~=2.4.8",
"stac-fastapi.types~=2.4.9",
"stac-fastapi.api~=2.4.9",
"stac-fastapi.extensions~=2.4.9",
"asyncpg",
"buildpg",
"brotli_asgi",
Expand Down
15 changes: 11 additions & 4 deletions stac_fastapi/pgstac/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from fastapi import HTTPException, Request
from stac_fastapi.extensions.third_party.bulk_transactions import (
AsyncBaseBulkTransactionsClient,
BulkTransactionMethod,
Items,
)
from stac_fastapi.types import stac as stac_types
Expand Down Expand Up @@ -180,9 +181,15 @@ class BulkTransactionsClient(AsyncBaseBulkTransactionsClient):

async def bulk_item_insert(self, items: Items, request: Request, **kwargs) -> str:
"""Bulk item insertion using pgstac."""
items = list(items.items.values())
async with request.app.state.get_connection(request, "w") as conn:
await dbfunc(conn, "create_items", items)
items_to_insert = list(items.items.values())

return_msg = f"Successfully added {len(items)} items."
async with request.app.state.get_connection(request, "w") as conn:
if items.method == BulkTransactionMethod.INSERT:
method_verb = "added"
await dbfunc(conn, "create_items", items_to_insert)
elif items.method == BulkTransactionMethod.UPSERT:
method_verb = "upserted"
await dbfunc(conn, "upsert_items", items_to_insert)

return_msg = f"Successfully {method_verb} {len(items_to_insert)} items."
return return_msg
70 changes: 70 additions & 0 deletions tests/clients/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,76 @@ async def test_create_bulk_items(
assert resp.status_code == 200


async def test_create_bulk_items_already_exist_insert(
app_client, load_test_data: Callable, load_test_collection
):
coll = load_test_collection
item = load_test_data("test_item.json")

items = {}
for _ in range(2):
_item = deepcopy(item)
_item["id"] = str(uuid.uuid4())
items[_item["id"]] = _item

payload = {"items": items, "method": "insert"}

resp = await app_client.post(
f"/collections/{coll.id}/bulk_items",
json=payload,
)
assert resp.status_code == 200
assert resp.text == '"Successfully added 2 items."'

for item_id in items.keys():
resp = await app_client.get(f"/collections/{coll.id}/items/{item_id}")
assert resp.status_code == 200

# Try creating the same items again.
# This should fail with the default insert behavior.
resp = await app_client.post(
f"/collections/{coll.id}/bulk_items",
json=payload,
)
assert resp.status_code == 409


async def test_create_bulk_items_already_exist_upsert(
app_client, load_test_data: Callable, load_test_collection
):
coll = load_test_collection
item = load_test_data("test_item.json")

items = {}
for _ in range(2):
_item = deepcopy(item)
_item["id"] = str(uuid.uuid4())
items[_item["id"]] = _item

payload = {"items": items, "method": "insert"}

resp = await app_client.post(
f"/collections/{coll.id}/bulk_items",
json=payload,
)
assert resp.status_code == 200
assert resp.text == '"Successfully added 2 items."'

for item_id in items.keys():
resp = await app_client.get(f"/collections/{coll.id}/items/{item_id}")
assert resp.status_code == 200

# Try creating the same items again, but using upsert.
# This should succeed.
payload["method"] = "upsert"
resp = await app_client.post(
f"/collections/{coll.id}/bulk_items",
json=payload,
)
assert resp.status_code == 200
assert resp.text == '"Successfully upserted 2 items."'


# TODO since right now puts implement upsert
# test_create_collection_already_exists
# test create_item_already_exists
Expand Down