Skip to content

Commit f8d4c85

Browse files
authored
Allow upserting data via bulk transaction requests (#64)
* Support method parameter on bulk transactions route to allow upserting bulk data * Temporarily switch stac-fastapi dependencies to fork with support for bulk transaction method param * Get bulk transactions method from items object. Avoid reassigning items variable. * Restore stac-fastapi requirements * Fixed formatting issues flagged by pre-commit * Updated stac-fastapi dependency to version supporting bulk transaction method argument * Updated Changelog with bulk transactions update * Add tests to confirm that upsert method results in success when posting the same items twice
1 parent d7902f6 commit f8d4c85

File tree

4 files changed

+86
-7
lines changed

4 files changed

+86
-7
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
- Add `method` parameter to Bulk Transactions requests in order to support upserting bulk data ([#64](https://github.com/stac-utils/stac-fastapi-pgstac/pull/64))
6+
57
## [2.4.10] - 2023-08-18
68

79
### Fixed

setup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
"orjson",
1111
"pydantic[dotenv]>=1.10.8", # https://github.com/pydantic/pydantic/issues/5821
1212
"stac_pydantic==2.0.*",
13-
"stac-fastapi.types~=2.4.8",
14-
"stac-fastapi.api~=2.4.8",
15-
"stac-fastapi.extensions~=2.4.8",
13+
"stac-fastapi.types~=2.4.9",
14+
"stac-fastapi.api~=2.4.9",
15+
"stac-fastapi.extensions~=2.4.9",
1616
"asyncpg",
1717
"buildpg",
1818
"brotli_asgi",

stac_fastapi/pgstac/transactions.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from fastapi import HTTPException, Request
1010
from stac_fastapi.extensions.third_party.bulk_transactions import (
1111
AsyncBaseBulkTransactionsClient,
12+
BulkTransactionMethod,
1213
Items,
1314
)
1415
from stac_fastapi.types import stac as stac_types
@@ -180,9 +181,15 @@ class BulkTransactionsClient(AsyncBaseBulkTransactionsClient):
180181

181182
async def bulk_item_insert(self, items: Items, request: Request, **kwargs) -> str:
182183
"""Bulk item insertion using pgstac."""
183-
items = list(items.items.values())
184-
async with request.app.state.get_connection(request, "w") as conn:
185-
await dbfunc(conn, "create_items", items)
184+
items_to_insert = list(items.items.values())
186185

187-
return_msg = f"Successfully added {len(items)} items."
186+
async with request.app.state.get_connection(request, "w") as conn:
187+
if items.method == BulkTransactionMethod.INSERT:
188+
method_verb = "added"
189+
await dbfunc(conn, "create_items", items_to_insert)
190+
elif items.method == BulkTransactionMethod.UPSERT:
191+
method_verb = "upserted"
192+
await dbfunc(conn, "upsert_items", items_to_insert)
193+
194+
return_msg = f"Successfully {method_verb} {len(items_to_insert)} items."
188195
return return_msg

tests/clients/test_postgres.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,76 @@ async def test_create_bulk_items(
326326
assert resp.status_code == 200
327327

328328

329+
async def test_create_bulk_items_already_exist_insert(
330+
app_client, load_test_data: Callable, load_test_collection
331+
):
332+
coll = load_test_collection
333+
item = load_test_data("test_item.json")
334+
335+
items = {}
336+
for _ in range(2):
337+
_item = deepcopy(item)
338+
_item["id"] = str(uuid.uuid4())
339+
items[_item["id"]] = _item
340+
341+
payload = {"items": items, "method": "insert"}
342+
343+
resp = await app_client.post(
344+
f"/collections/{coll.id}/bulk_items",
345+
json=payload,
346+
)
347+
assert resp.status_code == 200
348+
assert resp.text == '"Successfully added 2 items."'
349+
350+
for item_id in items.keys():
351+
resp = await app_client.get(f"/collections/{coll.id}/items/{item_id}")
352+
assert resp.status_code == 200
353+
354+
# Try creating the same items again.
355+
# This should fail with the default insert behavior.
356+
resp = await app_client.post(
357+
f"/collections/{coll.id}/bulk_items",
358+
json=payload,
359+
)
360+
assert resp.status_code == 409
361+
362+
363+
async def test_create_bulk_items_already_exist_upsert(
364+
app_client, load_test_data: Callable, load_test_collection
365+
):
366+
coll = load_test_collection
367+
item = load_test_data("test_item.json")
368+
369+
items = {}
370+
for _ in range(2):
371+
_item = deepcopy(item)
372+
_item["id"] = str(uuid.uuid4())
373+
items[_item["id"]] = _item
374+
375+
payload = {"items": items, "method": "insert"}
376+
377+
resp = await app_client.post(
378+
f"/collections/{coll.id}/bulk_items",
379+
json=payload,
380+
)
381+
assert resp.status_code == 200
382+
assert resp.text == '"Successfully added 2 items."'
383+
384+
for item_id in items.keys():
385+
resp = await app_client.get(f"/collections/{coll.id}/items/{item_id}")
386+
assert resp.status_code == 200
387+
388+
# Try creating the same items again, but using upsert.
389+
# This should succeed.
390+
payload["method"] = "upsert"
391+
resp = await app_client.post(
392+
f"/collections/{coll.id}/bulk_items",
393+
json=payload,
394+
)
395+
assert resp.status_code == 200
396+
assert resp.text == '"Successfully upserted 2 items."'
397+
398+
329399
# TODO since right now puts implement upsert
330400
# test_create_collection_already_exists
331401
# test create_item_already_exists

0 commit comments

Comments
 (0)