@@ -241,7 +241,7 @@ def wait(self, timeout=None):
241
241
results = None
242
242
try :
243
243
results = self ._async_result .get (timeout = timeout )
244
- except KeyboardInterrupt or NotImplementedError as i :
244
+ except KeyboardInterrupt as i :
245
245
# terminate workers abruptly on keyboard interrupt.
246
246
self ._processing_pool .terminate ()
247
247
self ._processing_pool .close ()
@@ -252,20 +252,18 @@ def wait(self, timeout=None):
252
252
self ._processing_pool .close ()
253
253
self ._processing_pool .clear ()
254
254
255
- if results == None :
256
- return
257
- else :
255
+ if results :
258
256
self ._failed_indices = [
259
257
failed_index for failed_indices in results for failed_index in failed_indices
260
258
]
261
259
262
- if len (self ._failed_indices ) > 0 :
263
- raise IngestionError (
264
- self ._failed_indices ,
265
- f"Failed to ingest some data into FeatureGroup { self .feature_group_name } " ,
266
- )
260
+ if len (self ._failed_indices ) > 0 :
261
+ raise IngestionError (
262
+ self ._failed_indices ,
263
+ f"Failed to ingest some data into FeatureGroup { self .feature_group_name } " ,
264
+ )
267
265
268
- def _run_multi_process (self , data_frame : DataFrame , wait = True , timeout = None ): # Not in use
266
+ def _run_multi_process (self , data_frame : DataFrame , wait = True , timeout = None ): # Not in use
269
267
"""Start the ingestion process with the specified number of processes.
270
268
271
269
Args:
@@ -348,7 +346,7 @@ def run(self, data_frame: DataFrame, wait=True, timeout=None):
348
346
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
349
347
if timeout is reached.
350
348
"""
351
- #self._run_multi_process(data_frame=data_frame, wait=wait, timeout=timeout)
349
+ # self._run_multi_process(data_frame=data_frame, wait=wait, timeout=timeout)
352
350
self ._run_multi_threaded (data_frame = data_frame , timeout = timeout )
353
351
354
352
0 commit comments