|
10 | 10 | from stac_fastapi.extensions.third_party.bulk_transactions import (
|
11 | 11 | AsyncBaseBulkTransactionsClient,
|
12 | 12 | Items,
|
| 13 | + BulkTransactionMethod, |
13 | 14 | )
|
14 | 15 | from stac_fastapi.types import stac as stac_types
|
15 | 16 | from stac_fastapi.types.core import AsyncBaseTransactionsClient
|
@@ -178,11 +179,17 @@ async def delete_collection(
|
178 | 179 | class BulkTransactionsClient(AsyncBaseBulkTransactionsClient):
|
179 | 180 | """Postgres bulk transactions."""
|
180 | 181 |
|
181 |
| - async def bulk_item_insert(self, items: Items, request: Request, **kwargs) -> str: |
| 182 | + async def bulk_item_insert( |
| 183 | + self, items: Items, request: Request, method: BulkTransactionMethod, **kwargs |
| 184 | + ) -> str: |
182 | 185 | """Bulk item insertion using pgstac."""
|
183 | 186 | items = list(items.items.values())
|
| 187 | + |
184 | 188 | async with request.app.state.get_connection(request, "w") as conn:
|
185 |
| - await dbfunc(conn, "create_items", items) |
| 189 | + if method == BulkTransactionMethod.INSERT: |
| 190 | + await dbfunc(conn, "create_items", items) |
| 191 | + elif method == BulkTransactionMethod.UPSERT: |
| 192 | + await dbfunc(conn, "upsert_items", items) |
186 | 193 |
|
187 | 194 | return_msg = f"Successfully added {len(items)} items."
|
188 | 195 | return return_msg
|
0 commit comments