@@ -146,33 +146,8 @@ def __attrs_post_init__(self):
146
146
settings = ElasticsearchSettings ()
147
147
self .client = settings .create_client
148
148
149
- # def _create_item_index(self):
150
- # mapping = {
151
- # "mappings": {
152
- # "properties": {
153
- # "geometry": {"type": "geo_shape"},
154
- # "id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
155
- # "properties__datetime": {
156
- # "type": "text",
157
- # "fields": {"keyword": {"type": "keyword"}},
158
- # },
159
- # }
160
- # }
161
- # }
162
-
163
- # _ = self.client.indices.create(
164
- # index="stac_items",
165
- # body=mapping,
166
- # ignore=400, # ignore 400 already exists code
167
- # )
168
-
169
149
def _preprocess_item (self , model : stac_types .Item , base_url ) -> stac_types .Item :
170
150
"""Preprocess items to match data model."""
171
- item_links = ItemLinks (
172
- collection_id = model ["collection" ], item_id = model ["id" ], base_url = base_url
173
- ).create_links ()
174
- model ["links" ] = item_links
175
-
176
151
if not self .client .exists (index = "stac_collections" , id = model ["collection" ]):
177
152
raise ForeignKeyError (f"Collection { model ['collection' ]} does not exist" )
178
153
@@ -181,18 +156,8 @@ def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item:
181
156
f"Item { model ['id' ]} in collection { model ['collection' ]} already exists"
182
157
)
183
158
184
- now = datetime .utcnow ().strftime (DATETIME_RFC339 )
185
- if "created" not in model ["properties" ]:
186
- model ["properties" ]["created" ] = str (now )
187
-
188
- # elasticsearch doesn't like the fact that some values are float and some were int
189
- if "eo:bands" in model ["properties" ]:
190
- for wave in model ["properties" ]["eo:bands" ]:
191
- for k , v in wave .items ():
192
- if type (v ) != str :
193
- v = float (v )
194
- wave .update ({k : v })
195
- return model
159
+ item = ItemSerializer .stac_to_db (model , base_url )
160
+ return item
196
161
197
162
def bulk_item_insert (self , items : Items , ** kwargs ) -> str :
198
163
"""Bulk item insertion using es."""
@@ -207,14 +172,6 @@ def bulk_item_insert(self, items: Items, **kwargs) -> str:
207
172
]
208
173
return_msg = f"Successfully added { len (processed_items )} items."
209
174
210
- # helpers.bulk(
211
- # self.client,
212
- # processed_items,
213
- # index="stac_items",
214
- # doc_type="_doc",
215
- # request_timeout=200,
216
- # )
217
-
218
175
def bulk_sync (processed_items ):
219
176
actions = [
220
177
{"_index" : "stac_items" , "_source" : item } for item in processed_items
0 commit comments