18
18
from stac_fastapi .types import stac as stac_types
19
19
from stac_fastapi .types .core import BaseTransactionsClient
20
20
from stac_fastapi .types .errors import ConflictError , ForeignKeyError , NotFoundError
21
- from stac_fastapi .types .links import CollectionLinks , ItemLinks
21
+ from stac_fastapi .types .links import CollectionLinks
22
22
23
23
logger = logging .getLogger (__name__ )
24
24
@@ -69,19 +69,6 @@ def create_item(self, model: stac_types.Item, **kwargs):
69
69
return return_msg
70
70
71
71
# If a single item is posted
72
- item_links = ItemLinks (
73
- collection_id = model ["collection" ], item_id = model ["id" ], base_url = base_url
74
- ).create_links ()
75
- model ["links" ] = item_links
76
-
77
- # elasticsearch doesn't like the fact that some values are float and some were int
78
- if "eo:bands" in model ["properties" ]:
79
- for wave in model ["properties" ]["eo:bands" ]:
80
- for k , v in wave .items ():
81
- if type (v ) != str :
82
- v = float (v )
83
- wave .update ({k : v })
84
-
85
72
if not self .client .exists (index = "stac_collections" , id = model ["collection" ]):
86
73
raise ForeignKeyError (f"Collection { model ['collection' ]} does not exist" )
87
74
@@ -90,12 +77,10 @@ def create_item(self, model: stac_types.Item, **kwargs):
90
77
f"Item { model ['id' ]} in collection { model ['collection' ]} already exists"
91
78
)
92
79
93
- now = datetime .utcnow ().strftime (DATETIME_RFC339 )
94
- if "created" not in model ["properties" ]:
95
- model ["properties" ]["created" ] = str (now )
80
+ data = ItemSerializer .stac_to_db (model , base_url )
96
81
97
82
self .client .index (
98
- index = "stac_items" , doc_type = "_doc" , id = model ["id" ], document = model
83
+ index = "stac_items" , doc_type = "_doc" , id = model ["id" ], document = data
99
84
)
100
85
return ItemSerializer .db_to_stac (model , base_url )
101
86
@@ -174,33 +159,8 @@ def __attrs_post_init__(self):
174
159
settings = ElasticsearchSettings ()
175
160
self .client = settings .create_client
176
161
177
- def _create_item_index (self ):
178
- mapping = {
179
- "mappings" : {
180
- "properties" : {
181
- "geometry" : {"type" : "geo_shape" },
182
- "id" : {"type" : "text" , "fields" : {"keyword" : {"type" : "keyword" }}},
183
- "properties__datetime" : {
184
- "type" : "text" ,
185
- "fields" : {"keyword" : {"type" : "keyword" }},
186
- },
187
- }
188
- }
189
- }
190
-
191
- _ = self .client .indices .create (
192
- index = "stac_items" ,
193
- body = mapping ,
194
- ignore = 400 , # ignore 400 already exists code
195
- )
196
-
197
162
def _preprocess_item (self , model : stac_types .Item , base_url ) -> stac_types .Item :
198
163
"""Preprocess items to match data model."""
199
- item_links = ItemLinks (
200
- collection_id = model ["collection" ], item_id = model ["id" ], base_url = base_url
201
- ).create_links ()
202
- model ["links" ] = item_links
203
-
204
164
if not self .client .exists (index = "stac_collections" , id = model ["collection" ]):
205
165
raise ForeignKeyError (f"Collection { model ['collection' ]} does not exist" )
206
166
@@ -209,18 +169,8 @@ def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item:
209
169
f"Item { model ['id' ]} in collection { model ['collection' ]} already exists"
210
170
)
211
171
212
- now = datetime .utcnow ().strftime (DATETIME_RFC339 )
213
- if "created" not in model ["properties" ]:
214
- model ["properties" ]["created" ] = str (now )
215
-
216
- # elasticsearch doesn't like the fact that some values are float and some were int
217
- if "eo:bands" in model ["properties" ]:
218
- for wave in model ["properties" ]["eo:bands" ]:
219
- for k , v in wave .items ():
220
- if type (v ) != str :
221
- v = float (v )
222
- wave .update ({k : v })
223
- return model
172
+ item = ItemSerializer .stac_to_db (model , base_url )
173
+ return item
224
174
225
175
def bulk_sync (self , processed_items ):
226
176
"""Elasticsearch bulk insertion."""
@@ -231,7 +181,8 @@ def bulk_sync(self, processed_items):
231
181
232
182
def bulk_item_insert (self , items : Items , ** kwargs ) -> str :
233
183
"""Bulk item insertion using es."""
234
- self ._create_item_index ()
184
+ transactions_client = TransactionsClient ()
185
+ transactions_client ._create_item_index ()
235
186
try :
236
187
base_url = str (kwargs ["request" ].base_url )
237
188
except Exception :
0 commit comments