Skip to content

Commit 5a0f291

Browse files
exception handling for provided coroutines
1 parent d212b81 commit 5a0f291

File tree

3 files changed

+64
-11
lines changed

3 files changed

+64
-11
lines changed

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ ubwa.create_stream(channels=['trade', 'kline_1m'],
8181
process_stream_data=process_new_receives)
8282
```
8383

84-
### Or await the webstream data in an asyncio task:
84+
### Or await the webstream data in an asyncio coroutine:
8585

8686
This is the recommended method for processing data from web streams.
8787

@@ -109,8 +109,7 @@ with BinanceWebSocketApiManager(exchange='binance.com') as ubwa:
109109
except KeyboardInterrupt:
110110
print("\r\nGracefully stopping ...")
111111
except Exception as error_msg:
112-
print(f"\r\nERROR: {error_msg}")
113-
print("Gracefully stopping ...")
112+
print(f"\r\nERROR: {error_msg}\r\nGracefully stopping ...")
114113
```
115114

116115
Basically that's it, but there are more options.

dev/test_async_queue.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ async def process_asyncio_queue_global(self, stream_id=None):
3030
current_update_id = {}
3131
while self.ubwa.is_stop_request(stream_id=stream_id) is False:
3232
data = await self.ubwa.get_stream_data_from_asyncio_queue(stream_id)
33+
b = data['lalaal']
3334
if data.get('data'):
3435
market = str(data.get('stream').split('@')[0]).lower()
3536
current_update_id[market] = data.get('data').get('lastUpdateId')
@@ -44,6 +45,7 @@ async def process_asyncio_queue_specific(self, stream_id=None):
4445
print(f"Start processing data of {stream_id} ...")
4546
while self.ubwa.is_stop_request(stream_id=stream_id) is False:
4647
data = await self.ubwa.get_stream_data_from_asyncio_queue(stream_id)
48+
b = data['2222']
4749
# print(data)
4850
self.ubwa.asyncio_queue_task_done(stream_id)
4951

@@ -62,7 +64,7 @@ async def start(self):
6264
self.ubwa.create_stream(markets='arr', channels='!userData',
6365
api_key="api_key", api_secret="api_secret")
6466
while self.ubwa.is_manager_stopping() is False:
65-
self.ubwa.print_summary()
67+
#self.ubwa.print_summary()
6668
await asyncio.sleep(5)
6769

6870

unicorn_binance_websocket_api/manager.py

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,38 @@ def __exit__(self, exc_type, exc_value, error_traceback):
456456
if exc_type:
457457
logger.critical(f"An exception occurred: {exc_type} - {exc_value} - {error_traceback}")
458458

459+
async def _run_process_asyncio_queue(self, scope=None, stream_id=None) -> bool:
460+
""" Execute a provided coroutine within the loop and process the exception results asynchronously."""
461+
if stream_id is None:
462+
return False
463+
stream_label = self.get_stream_label(stream_id=stream_id)
464+
if stream_label is None:
465+
stream_label = ""
466+
else:
467+
stream_label = f" ({stream_label})"
468+
try:
469+
if scope == "global":
470+
await self.process_asyncio_queue(stream_id=stream_id)
471+
elif scope == "specific":
472+
await self.specific_process_asyncio_queue[stream_id](stream_id=stream_id)
473+
else:
474+
return False
475+
logger.debug(f"`process_asyncio_queue` of stream_id {stream_id}{stream_label} completed successfully.")
476+
return True
477+
except Exception as error_msg:
478+
error_msg_wrapper = (f"Exception within to UBWA`s provided `process_asyncio_queue`-coroutine of stream "
479+
f"'{stream_id}'{stream_label}: "
480+
f"\033[1m\033[31m{type(error_msg).__name__} - {error_msg}\033[0m\r\n"
481+
f"{traceback.format_exc()}")
482+
print(f"\r\n{error_msg_wrapper}")
483+
error_msg_wrapper = (f"Exception within to UBWA`s provided `process_asyncio_queue`-coroutine of stream "
484+
f"'{stream_id}'{stream_label}: "
485+
f"{type(error_msg).__name__} - {error_msg}\r\n"
486+
f"{traceback.format_exc()}")
487+
logger.critical(error_msg_wrapper)
488+
self._crash_stream(stream_id=stream_id, error_msg=error_msg_wrapper)
489+
return False
490+
459491
async def _shutdown_asyncgens(self, stream_id=None, loop=None) -> bool:
460492
if loop is None:
461493
return False
@@ -595,6 +627,10 @@ def send_stream_signal(self, signal_type=None, stream_id=None, data_record=None,
595627
"""
596628
Send a stream signal
597629
"""
630+
if str(error_msg).startswith("Stream with stream_id="):
631+
match = re.search(r'Reason:\s*(.*)', error_msg)
632+
if match:
633+
error_msg = match.group(1)
598634
self.process_stream_signals(signal_type=signal_type,
599635
stream_id=stream_id,
600636
data_record=data_record,
@@ -740,6 +776,7 @@ def _add_stream_to_stream_list(self,
740776
'last_heartbeat': None,
741777
'stop_request': False,
742778
'crash_request': False,
779+
'crash_request_reason': None,
743780
'loop_is_closing': False,
744781
'seconds_since_has_stopped': None,
745782
'has_stopped': None,
@@ -753,7 +790,8 @@ def _add_stream_to_stream_list(self,
753790
'last_received_data_record': None,
754791
'processed_receives_statistic': {},
755792
'transfer_rate_per_second': {'bytes': {}, 'speed': 0},
756-
'websocket_uri': None}
793+
'websocket_uri': None,
794+
'3rd-party-future': None}
757795
logger.info("BinanceWebSocketApiManager._add_stream_to_stream_list(" +
758796
str(stream_id) + ", " + str(channels) + ", " + str(markets) + ", " + str(stream_label) + ", "
759797
+ str(stream_buffer_name) + ", " + str(stream_buffer_maxlen) + ", " + str(symbols) + ")")
@@ -816,6 +854,11 @@ def _create_stream_thread(self,
816854
except RuntimeError as error_msg:
817855
logger.debug(f"BinanceWebSocketApiManager._create_stream_thread() stream_id={str(stream_id)} "
818856
f" - RuntimeError `error: 12` - error_msg: {str(error_msg)}")
857+
except Exception as error_msg:
858+
logger.critical(f"BinanceWebSocketApiManager._create_stream_thread({str(stream_id)} - Unknown Exception - "
859+
f"Please report this issue if your stream does not restart: "
860+
f"https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/issues/new/choose"
861+
f" - error_msg: {str(error_msg)}")
819862
finally:
820863
logger.debug(f"Finally closing the loop stream_id={str(stream_id)}")
821864
try:
@@ -1694,7 +1737,8 @@ def create_stream(self,
16941737
logger.debug(f"BinanceWebSocketApiManager.create_stream({stream_id} - Adding "
16951738
f"`specific_process_asyncio_queue[{stream_id}]()` to asyncio loop ...")
16961739
if self.get_event_loop_by_stream_id(stream_id=stream_id) is not None:
1697-
asyncio.run_coroutine_threadsafe(self.specific_process_asyncio_queue[stream_id](stream_id=stream_id),
1740+
asyncio.run_coroutine_threadsafe(self._run_process_asyncio_queue(scope="specific",
1741+
stream_id=stream_id),
16981742
self.get_event_loop_by_stream_id(stream_id=stream_id))
16991743
else:
17001744
logger.error(f"BinanceWebSocketApiManager.create_stream({stream_id} - No valid asyncio loop!")
@@ -1705,7 +1749,8 @@ def create_stream(self,
17051749
logger.debug(f"BinanceWebSocketApiManager.create_stream({stream_id} - "
17061750
f"Adding `process_asyncio_queue()` to asyncio loop ...")
17071751
if self.get_event_loop_by_stream_id(stream_id=stream_id) is not None:
1708-
asyncio.run_coroutine_threadsafe(self.process_asyncio_queue(stream_id=stream_id),
1752+
asyncio.run_coroutine_threadsafe(self._run_process_asyncio_queue(scope="global",
1753+
stream_id=stream_id),
17091754
self.get_event_loop_by_stream_id(stream_id=stream_id))
17101755
else:
17111756
logger.error(f"BinanceWebSocketApiManager.create_stream({stream_id} - No valid asyncio loop!")
@@ -2305,7 +2350,8 @@ def get_latest_release_info():
23052350
:return: dict or None
23062351
"""
23072352
try:
2308-
respond = requests.get('https://api.github.com/repos/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/releases/latest')
2353+
respond = requests.get(f'https://api.github.com/repos/LUCIT-Systems-and-Development/'
2354+
f'unicorn-binance-websocket-api/releases/latest')
23092355
latest_release_info = respond.json()
23102356
return latest_release_info
23112357
except Exception:
@@ -4128,18 +4174,21 @@ def stop_stream(self, stream_id, delete_listen_key: bool = True):
41284174
f"requests.exceptions.ConnectionError: {error_msg}")
41294175
return True
41304176

4131-
def _crash_stream(self, stream_id):
4177+
def _crash_stream(self, stream_id, error_msg=None):
41324178
"""
41334179
Loop inside: Stop a specific stream with 'crashed' status
41344180
41354181
:param stream_id: id of a stream
41364182
:type stream_id: str
4183+
:param error_msg: Reason
4184+
:type error_msg: str
41374185
:return: bool
41384186
"""
41394187
# stop a specific stream by stream_id
41404188
logger.critical(f"BinanceWebSocketApiManager._crash_stream({stream_id}){self.get_debug_log()}")
41414189
try:
41424190
self.stream_list[stream_id]['crash_request'] = True
4191+
self.stream_list[stream_id]['crash_request_reason'] = error_msg
41434192
except KeyError:
41444193
return False
41454194
return True
@@ -4160,7 +4209,10 @@ def _stream_is_crashing(self, stream_id: str = None, error_msg: str = None) -> b
41604209
self.set_socket_is_ready(stream_id)
41614210
if error_msg is not None:
41624211
self.stream_list[stream_id]['status'] += " - " + str(error_msg)
4163-
self.send_stream_signal(stream_id=stream_id, signal_type="STREAM_UNREPAIRABLE")
4212+
else:
4213+
if self.stream_list[stream_id]['crash_request_reason'] is not None:
4214+
error_msg = self.stream_list[stream_id]['crash_request_reason']
4215+
self.send_stream_signal(stream_id=stream_id, signal_type="STREAM_UNREPAIRABLE", error_msg=error_msg)
41644216
return True
41654217

41664218
def _stream_is_restarting(self, stream_id, error_msg=None):
@@ -4259,7 +4311,7 @@ def subscribe_to_stream(self, stream_id, channels=[], markets=[]):
42594311
"been exceeded!"
42604312
logger.critical(f"BinanceWebSocketApiManager.subscribe_to_stream({str(stream_id)}) "
42614313
f"Info: {str(error_msg)}")
4262-
self._crash_stream(stream_id)
4314+
self._crash_stream(stream_id, error_msg=error_msg)
42634315
return False
42644316

42654317
try:

0 commit comments

Comments
 (0)