Skip to content

move classes in transactions.py to core.py, to align with parent class organization #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ For changes, see the [Changelog](CHANGELOG.md).

## Development Environment Setup

To install the classes in your local Python env, run:

```shell
cd stac_fastapi/elasticsearch
pip install -e '.[dev]'
```

### Pre-commit

Install [pre-commit](https://pre-commit.com/#install).

Prior to commit, run:
Expand All @@ -16,10 +25,6 @@ Prior to commit, run:
pre-commit run --all-files`
```

```shell
cd stac_fastapi/elasticsearch
pip install .[dev]
```

## Building

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ services:
discovery.type: single-node
network.host: 0.0.0.0
http.port: 9200
ES_JAVA_OPTS: -Xms512m -Xmx512m
ES_JAVA_OPTS: -Xms512m -Xmx1g
ports:
- "9200:9200"
1 change: 1 addition & 0 deletions stac_fastapi/elasticsearch/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
desc = f.read()

install_requires = [
"fastapi",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was used explicitly in one of the classes, but wasn't declared as a dependency (it's transitive from stac_fastapi)

"attrs",
"pydantic[dotenv]",
"stac_pydantic==2.0.*",
Expand Down
10 changes: 5 additions & 5 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
from stac_fastapi.api.app import StacApi
from stac_fastapi.api.models import create_get_request_model, create_post_request_model
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
from stac_fastapi.elasticsearch.core import CoreCrudClient
from stac_fastapi.elasticsearch.extensions import QueryExtension
from stac_fastapi.elasticsearch.indexes import IndexesClient
from stac_fastapi.elasticsearch.session import Session
from stac_fastapi.elasticsearch.transactions import (
from stac_fastapi.elasticsearch.core import (
BulkTransactionsClient,
CoreCrudClient,
TransactionsClient,
)
from stac_fastapi.elasticsearch.extensions import QueryExtension
from stac_fastapi.elasticsearch.indexes import IndexesClient
from stac_fastapi.elasticsearch.session import Session
from stac_fastapi.extensions.core import (
ContextExtension,
FieldsExtension,
Expand Down
135 changes: 130 additions & 5 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,29 @@
import json
import logging
from datetime import datetime as datetime_type
from datetime import timezone
from typing import List, Optional, Type, Union
from urllib.parse import urljoin

import attr
from fastapi import HTTPException
from overrides import overrides

# from geojson_pydantic.geometries import Polygon
from pydantic import ValidationError
from stac_pydantic.links import Relations
from stac_pydantic.shared import MimeTypes

from stac_fastapi.elasticsearch import serializers
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
from stac_fastapi.elasticsearch.database_logic import DatabaseLogic
from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer
from stac_fastapi.elasticsearch.session import Session

# from stac_fastapi.elasticsearch.types.error_checks import ErrorChecks
from stac_fastapi.types.core import BaseCoreClient
from stac_fastapi.extensions.third_party.bulk_transactions import (
BaseBulkTransactionsClient,
Items,
)
from stac_fastapi.types import stac as stac_types
from stac_fastapi.types.core import BaseCoreClient, BaseTransactionsClient
from stac_fastapi.types.links import CollectionLinks
from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -291,3 +296,123 @@ def post_search(self, search_request, **kwargs) -> ItemCollection:
links=links,
context=context_obj,
)


@attr.s
class TransactionsClient(BaseTransactionsClient):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no changes to these classes other than fixing up references and using datetime_type rather than datetime

"""Transactions extension specific CRUD operations."""

session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
database = DatabaseLogic()

@overrides
def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
"""Create item."""
base_url = str(kwargs["request"].base_url)

# If a feature collection is posted
if item["type"] == "FeatureCollection":
bulk_client = BulkTransactionsClient()
processed_items = [
bulk_client.preprocess_item(item, base_url) for item in item["features"]
]
return_msg = f"Successfully added {len(processed_items)} items."
self.database.bulk_sync(processed_items)

return return_msg
else:
item = self.database.prep_create_item(item=item, base_url=base_url)
self.database.create_item(item=item, base_url=base_url)
return item

@overrides
def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
"""Update item."""
base_url = str(kwargs["request"].base_url)
now = datetime_type.now(timezone.utc).isoformat().replace("+00:00", "Z")
item["properties"]["updated"] = str(now)

self.database.check_collection_exists(collection_id=item["collection"])
# todo: index instead of delete and create
self.delete_item(item_id=item["id"], collection_id=item["collection"])
self.create_item(item=item, **kwargs)

return ItemSerializer.db_to_stac(item, base_url)

@overrides
def delete_item(
self, item_id: str, collection_id: str, **kwargs
) -> stac_types.Item:
"""Delete item."""
self.database.delete_item(item_id=item_id, collection_id=collection_id)
return None

@overrides
def create_collection(
self, collection: stac_types.Collection, **kwargs
) -> stac_types.Collection:
"""Create collection."""
base_url = str(kwargs["request"].base_url)
collection_links = CollectionLinks(
collection_id=collection["id"], base_url=base_url
).create_links()
collection["links"] = collection_links
self.database.create_collection(collection=collection)

return CollectionSerializer.db_to_stac(collection, base_url)

@overrides
def update_collection(
self, collection: stac_types.Collection, **kwargs
) -> stac_types.Collection:
"""Update collection."""
base_url = str(kwargs["request"].base_url)

self.database.find_collection(collection_id=collection["id"])
self.delete_collection(collection["id"])
self.create_collection(collection, **kwargs)

return CollectionSerializer.db_to_stac(collection, base_url)

@overrides
def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collection:
"""Delete collection."""
self.database.delete_collection(collection_id=collection_id)
return None


@attr.s
class BulkTransactionsClient(BaseBulkTransactionsClient):
"""Postgres bulk transactions."""

session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
database = DatabaseLogic()

def __attrs_post_init__(self):
"""Create es engine."""
settings = ElasticsearchSettings()
self.client = settings.create_client

def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item:
"""Preprocess items to match data model."""
item = self.database.prep_create_item(item=item, base_url=base_url)
return item

@overrides
def bulk_item_insert(
self, items: Items, chunk_size: Optional[int] = None, **kwargs
) -> str:
"""Bulk item insertion using es."""
request = kwargs.get("request")
if request:
base_url = str(request.base_url)
else:
base_url = ""

processed_items = [
self.preprocess_item(item, base_url) for item in items.items.values()
]

self.database.bulk_sync(processed_items)

return f"Successfully added {len(processed_items)} Items."
142 changes: 0 additions & 142 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from tests.conftest import MockStarletteRequest

from stac_fastapi.api.app import StacApi
from stac_fastapi.elasticsearch.core import CoreCrudClient
from stac_fastapi.elasticsearch.transactions import (
from stac_fastapi.elasticsearch.core import (
BulkTransactionsClient,
CoreCrudClient,
TransactionsClient,
)
from stac_fastapi.extensions.third_party.bulk_transactions import Items
Expand Down
8 changes: 4 additions & 4 deletions stac_fastapi/elasticsearch/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
from stac_fastapi.api.app import StacApi
from stac_fastapi.api.models import create_request_model
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
from stac_fastapi.elasticsearch.core import CoreCrudClient
from stac_fastapi.elasticsearch.extensions import QueryExtension
from stac_fastapi.elasticsearch.indexes import IndexesClient
from stac_fastapi.elasticsearch.transactions import (
from stac_fastapi.elasticsearch.core import (
BulkTransactionsClient,
CoreCrudClient,
TransactionsClient,
)
from stac_fastapi.elasticsearch.extensions import QueryExtension
from stac_fastapi.elasticsearch.indexes import IndexesClient
from stac_fastapi.extensions.core import (
ContextExtension,
FieldsExtension,
Expand Down