|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | + |
| 4 | +from unicorn_binance_websocket_api import BinanceWebSocketApiManager |
| 5 | +import asyncio |
| 6 | +import logging |
| 7 | +import os |
| 8 | + |
| 9 | +logging.getLogger("unicorn_binance_websocket_api") |
| 10 | +logging.basicConfig(level=logging.INFO, |
| 11 | + filename=os.path.basename(__file__) + '.log', |
| 12 | + format="{asctime} [{levelname:8}] {process} {thread} {module}: {message}", |
| 13 | + style="{") |
| 14 | + |
| 15 | + |
| 16 | +class BinanceDataProcessor: |
| 17 | + def __init__(self, print_new_data=False, start_multiplex=True, start_userdata_a=True): |
| 18 | + self.api_key = "YOUR_BINANCE_API_KEY" |
| 19 | + self.api_secret = "YOUR_BINANCE_API_SECRET" |
| 20 | + self.example_database = [] |
| 21 | + self.print_new_data = print_new_data |
| 22 | + self.start_multiplex = start_multiplex |
| 23 | + self.start_userdata_a = start_userdata_a |
| 24 | + self.ubwa = BinanceWebSocketApiManager(exchange='binance.com', |
| 25 | + auto_data_cleanup_stopped_streams=True, |
| 26 | + enable_stream_signal_buffer=True, |
| 27 | + output_default='UnicornFy', |
| 28 | + process_stream_signals=self.receive_stream_signal) |
| 29 | + |
| 30 | + async def main(self): |
| 31 | + if self.start_multiplex is True: |
| 32 | + self.ubwa.create_stream(channels=['trade', 'kline_1m', 'depth5'], |
| 33 | + markets=['btcusdt', 'ethusdt', 'bnbusdt'], |
| 34 | + process_asyncio_queue=self.processing_of_new_data, |
| 35 | + stream_label="multiplex") |
| 36 | + |
| 37 | + if self.start_userdata_a is True: |
| 38 | + self.ubwa.create_stream(api_key=self.api_key, api_secret=self.api_secret, |
| 39 | + channels=["arr"], markets=["!userData"], |
| 40 | + process_asyncio_queue=self.processing_of_new_data, |
| 41 | + stream_label="userData_A") |
| 42 | + |
| 43 | + while self.ubwa.is_manager_stopping() is False: |
| 44 | + await asyncio.sleep(1) |
| 45 | + stream_info = \ |
| 46 | + {'multiplex': self.ubwa.get_stream_info(stream_id=self.ubwa.get_stream_id_by_label('multiplex')), |
| 47 | + 'userData_A': self.ubwa.get_stream_info(stream_id=self.ubwa.get_stream_id_by_label('userData_A'))} |
| 48 | + status_text = "" |
| 49 | + if self.start_multiplex is True: |
| 50 | + status_text += (f"\tStream 'multiplex' is {stream_info['multiplex']['status']} " |
| 51 | + f"(last_stream_signal={stream_info['multiplex']['last_stream_signal']})\r\n") |
| 52 | + if self.start_userdata_a is True: |
| 53 | + status_text += (f"\tStream 'userData_A' is {stream_info['userData_A']['status']} " |
| 54 | + f"(last_stream_signal={stream_info['userData_A']['last_stream_signal']})\r\n") |
| 55 | + print(f"Status:\r\n\tStored {len(self.example_database)} data records in `self.example_database`\r\n" |
| 56 | + f"{status_text}") |
| 57 | + |
| 58 | + async def processing_of_new_data(self, stream_id=None): |
| 59 | + print(f"Saving the data from webstream {self.ubwa.get_stream_label(stream_id=stream_id)} to the database ...") |
| 60 | + while self.ubwa.is_stop_request(stream_id=stream_id) is False: |
| 61 | + data = await self.ubwa.get_stream_data_from_asyncio_queue(stream_id) |
| 62 | + self.example_database.append(data) |
| 63 | + if self.print_new_data is True: |
| 64 | + print(f"Data record received and added to the database: {data}") |
| 65 | + self.ubwa.asyncio_queue_task_done(stream_id) |
| 66 | + |
| 67 | + def receive_stream_signal(self, signal_type=None, stream_id=None, data_record=None, error_msg=None): |
| 68 | + # More info about `stream_signals`: |
| 69 | + # https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/wiki/%60stream_signals%60 |
| 70 | + print(f"Received stream_signal for stream '{self.ubwa.get_stream_label(stream_id=stream_id)}': " |
| 71 | + f"{signal_type} - {stream_id} - {data_record} - {error_msg}") |
| 72 | + |
| 73 | + |
| 74 | +if __name__ == "__main__": |
| 75 | + bdp = BinanceDataProcessor(print_new_data=False, start_multiplex=True, start_userdata_a=True) |
| 76 | + try: |
| 77 | + asyncio.run(bdp.main()) |
| 78 | + except KeyboardInterrupt: |
| 79 | + print("\r\nGracefully stopping ...") |
| 80 | + except Exception as e: |
| 81 | + print(f"\r\nERROR: {e}") |
| 82 | + print("Gracefully stopping ...") |
| 83 | + bdp.ubwa.stop_manager() |
0 commit comments