Skip to content

Allow specifying insert method for bulk transactions #614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Forward `x-forwarded-host` ([#586](https://github.com/stac-utils/stac-fastapi/pull/586))
* Add CQL2-json to filter conformance class ([#611](https://github.com/stac-utils/stac-fastapi/issues/611))
* Add `method` parameter to Bulk Transactions extension in order to support upserting bulk data ([#614](https://github.com/stac-utils/stac-fastapi/pull/614))

## [2.4.8] - 2023-06-07

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Bulk transactions extension."""
import abc
from enum import Enum
from typing import Any, Dict, List, Optional, Union

import attr
Expand All @@ -11,10 +12,18 @@
from stac_fastapi.types.extension import ApiExtension


class BulkTransactionMethod(str, Enum):
"""Bulk Transaction Methods."""

INSERT = "insert"
UPSERT = "upsert"


class Items(BaseModel):
"""A group of STAC Item objects, in the form of a dictionary from Item.id -> Item."""

items: Dict[str, Any]
method: BulkTransactionMethod = BulkTransactionMethod.INSERT

def __iter__(self):
"""Return an iterable of STAC Item objects."""
Expand All @@ -36,7 +45,10 @@ def _chunks(lst, n):

@abc.abstractmethod
def bulk_item_insert(
self, items: Items, chunk_size: Optional[int] = None, **kwargs
self,
items: Items,
chunk_size: Optional[int] = None,
**kwargs,
) -> str:
"""Bulk creation of items.

Expand All @@ -55,7 +67,11 @@ class AsyncBaseBulkTransactionsClient(abc.ABC):
"""BulkTransactionsClient."""

@abc.abstractmethod
async def bulk_item_insert(self, items: Items, **kwargs) -> str:
async def bulk_item_insert(
self,
items: Items,
**kwargs,
) -> str:
"""Bulk creation of items.

Args:
Expand All @@ -77,11 +93,19 @@ class BulkTransactionExtension(ApiExtension):
attribute "items", that has a value that is an object with a group of
attributes that are the ids of each Item, and the value is the Item entity.

Optionally, clients can specify a "method" attribute that is either "insert"
or "upsert". If "insert", then the items will be inserted if they do not
exist, and an error will be returned if they do. If "upsert", then the items
will be inserted if they do not exist, and updated if they do. This defaults
to "insert".

{
"items": {
"id1": { "type": "Feature", ... },
"id2": { "type": "Feature", ... },
"id3": { "type": "Feature", ... }
"items": {
"id1": { "type": "Feature", ... },
"id2": { "type": "Feature", ... },
"id3": { "type": "Feature", ... }
},
"method": "insert"
}
"""

Expand Down