Skip to content

Commit 1cbd869

Browse files
committed
implement bulk sync
1 parent f078628 commit 1cbd869

File tree

1 file changed

+84
-36
lines changed

1 file changed

+84
-36
lines changed

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py

Lines changed: 84 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,26 @@ class TransactionsClient(BaseTransactionsClient):
3131
settings = ElasticsearchSettings()
3232
client = settings.create_client
3333

34+
def _create_item_index(self):
35+
mapping = {
36+
"mappings": {
37+
"properties": {
38+
"geometry": {"type": "geo_shape"},
39+
"id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
40+
"properties__datetime": {
41+
"type": "text",
42+
"fields": {"keyword": {"type": "keyword"}},
43+
},
44+
}
45+
}
46+
}
47+
48+
_ = self.client.indices.create(
49+
index="stac_items",
50+
body=mapping,
51+
ignore=400, # ignore 400 already exists code
52+
)
53+
3454
def create_item(self, model: stac_types.Item, **kwargs):
3555
"""Create item."""
3656
base_url = str(kwargs["request"].base_url)
@@ -59,24 +79,7 @@ def create_item(self, model: stac_types.Item, **kwargs):
5979
if "created" not in model["properties"]:
6080
model["properties"]["created"] = str(now)
6181

62-
mapping = {
63-
"mappings": {
64-
"properties": {
65-
"geometry": {"type": "geo_shape"},
66-
"id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
67-
"properties__datetime": {
68-
"type": "text",
69-
"fields": {"keyword": {"type": "keyword"}},
70-
},
71-
}
72-
}
73-
}
74-
75-
_ = self.client.indices.create(
76-
index="stac_items",
77-
body=mapping,
78-
ignore=400, # ignore 400 already exists code
79-
)
82+
self._create_item_index()
8083

8184
self.client.index(
8285
index="stac_items", doc_type="_doc", id=model["id"], document=model
@@ -91,6 +94,8 @@ def create_collection(self, model: stac_types.Collection, **kwargs):
9194
).create_links()
9295
model["links"] = collection_links
9396

97+
self._create_item_index()
98+
9499
if self.client.exists(index="stac_collections", id=model["id"]):
95100
raise ConflictError(f"Collection {model['id']} already exists")
96101
self.client.index(
@@ -155,39 +160,82 @@ def __attrs_post_init__(self):
155160
settings = ElasticsearchSettings()
156161
self.client = settings.create_client
157162

163+
def _create_item_index(self):
164+
mapping = {
165+
"mappings": {
166+
"properties": {
167+
"geometry": {"type": "geo_shape"},
168+
"id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
169+
"properties__datetime": {
170+
"type": "text",
171+
"fields": {"keyword": {"type": "keyword"}},
172+
},
173+
}
174+
}
175+
}
176+
177+
_ = self.client.indices.create(
178+
index="stac_items",
179+
body=mapping,
180+
ignore=400, # ignore 400 already exists code
181+
)
182+
158183
def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item:
159184
"""Preprocess items to match data model."""
160185
item_links = ItemLinks(
161186
collection_id=model["collection"], item_id=model["id"], base_url=base_url
162187
).create_links()
163188
model["links"] = item_links
164189

165-
# with self.client.start_session(causal_consistency=True) as session:
166-
# error_check = ErrorChecks(session=session, client=self.client)
167-
# error_check._check_collection_foreign_key(model)
168-
# error_check._check_item_conflict(model)
169-
# now = datetime.utcnow().strftime(DATETIME_RFC339)
170-
# if "created" not in model["properties"]:
171-
# model["properties"]["created"] = str(now)
172-
# return model
190+
if not self.client.exists(index="stac_collections", id=model["collection"]):
191+
raise ForeignKeyError(f"Collection {model['collection']} does not exist")
192+
193+
if self.client.exists(index="stac_items", id=model["id"]):
194+
raise ConflictError(
195+
f"Item {model['id']} in collection {model['collection']} already exists"
196+
)
197+
198+
now = datetime.utcnow().strftime(DATETIME_RFC339)
199+
if "created" not in model["properties"]:
200+
model["properties"]["created"] = str(now)
173201

202+
# elasticsearch doesn't like the fact that some values are float and some were int
203+
if "eo:bands" in model["properties"]:
204+
for wave in model["properties"]["eo:bands"]:
205+
for k, v in wave.items():
206+
if type(v) != str:
207+
v = float(v)
208+
wave.update({k: v})
209+
return model
210+
174211
def bulk_item_insert(self, items: Items, **kwargs) -> str:
175212
"""Bulk item insertion using es."""
213+
self._create_item_index()
176214
try:
177215
base_url = str(kwargs["request"].base_url)
178216
except Exception:
179217
base_url = ""
180218
processed_items = [self._preprocess_item(item, base_url) for item in items]
181219
return_msg = f"Successfully added {len(processed_items)} items."
182-
# with self.client.start_session(causal_consistency=True) as session:
183-
# self.item_table.insert_many(processed_items, session=session)
184-
# return return_msg
185220

186-
helpers.bulk(
187-
self.client,
188-
processed_items,
189-
index="stac_items",
190-
doc_type="_doc",
191-
request_timeout=200,
192-
)
221+
# helpers.bulk(
222+
# self.client,
223+
# processed_items,
224+
# index="stac_items",
225+
# doc_type="_doc",
226+
# request_timeout=200,
227+
# )
228+
229+
def bulk_sync(processed_items):
230+
actions = [
231+
{
232+
"_index": "stac_items",
233+
"_source": item
234+
} for item in processed_items
235+
]
236+
237+
helpers.bulk(self.client, actions)
238+
239+
bulk_sync(processed_items)
240+
193241
return return_msg

0 commit comments

Comments
 (0)