Skip to content

Commit 2e92470

Browse files
send_with_stream()
1 parent 8b43429 commit 2e92470

File tree

9 files changed

+318
-44
lines changed

9 files changed

+318
-44
lines changed

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p
99

1010
[How to upgrade to the latest version!](https://unicorn-binance-websocket-api.docs.lucit.tech/readme.html#installation-and-upgrade)
1111

12-
## 2.5.0.dev (development stage/unreleased/unstable)
12+
## 2.6.0.dev (development stage/unreleased/unstable)
13+
14+
## 2.6.0
1315
### Added
1416
- Better Logging to investigate [issue#374](https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/issues/374)
17+
- `send_with_stream()` - Send a payload with a specific stream.
18+
### Changed
19+
- Replaced all calls of `add_payload_to_stream()` in `manager.py`, `api.py` with `send_with_stream()`
1520

1621
## 2.5.0
1722
Functionally, nothing changes with this update. However, there are now sensible error messages if errors occur in the

dev_websocket_api_speed.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
#
4+
# File: example_websocket_api.py
5+
#
6+
# Part of ‘UNICORN Binance WebSocket API’
7+
# Project website: https://www.lucit.tech/unicorn-binance-websocket-api.html
8+
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api
9+
# Documentation: https://unicorn-binance-websocket-api.docs.lucit.tech
10+
# PyPI: https://pypi.org/project/unicorn-binance-websocket-api
11+
# LUCIT Online Shop: https://shop.lucit.services/software
12+
#
13+
# Author: LUCIT Systems and Development
14+
#
15+
# Copyright (c) 2019-2024, LUCIT Systems and Development (https://www.lucit.tech)
16+
# All rights reserved.
17+
#
18+
# Permission is hereby granted, free of charge, to any person obtaining a
19+
# copy of this software and associated documentation files (the
20+
# "Software"), to deal in the Software without restriction, including
21+
# without limitation the rights to use, copy, modify, merge, publish, dis-
22+
# tribute, sublicense, and/or sell copies of the Software, and to permit
23+
# persons to whom the Software is furnished to do so, subject to the fol-
24+
# lowing conditions:
25+
#
26+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
27+
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
28+
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
29+
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
30+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
31+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
32+
# IN THE SOFTWARE.
33+
34+
from unicorn_binance_websocket_api.manager import BinanceWebSocketApiManager
35+
from unicorn_binance_rest_api import BinanceRestApiManager
36+
import asyncio
37+
import logging
38+
import os
39+
import time
40+
41+
api_key = ""
42+
api_secret = ""
43+
44+
45+
async def binance_stream(ubwa):
46+
async def handle_socket_message(stream_id=None):
47+
while ubwa.is_stop_request(stream_id=stream_id) is False:
48+
data = await ubwa.get_stream_data_from_asyncio_queue(stream_id=stream_id)
49+
try:
50+
if data['result']['serverTime'] is not None:
51+
print(f"UBWA: Time between request and answer: {(time.time() - start_time_get_server_time_ubwa)} "
52+
f"seconds\r\nreceived data:\r\n{data}\r\n")
53+
except KeyError:
54+
print(f"received data:\r\n{data}\r\n")
55+
except TypeError:
56+
print(f"received data:\r\n{data}\r\n")
57+
58+
api_stream = ubwa.create_stream(api=True, api_key=api_key, api_secret=api_secret,
59+
stream_label="Bobs Websocket API",
60+
process_asyncio_queue=handle_socket_message)
61+
time.sleep(5)
62+
for i in range(10):
63+
print(f"##################################################################")
64+
start_time_get_server_time_ubwa = time.time()
65+
ubwa.api.get_server_time(stream_id=api_stream)
66+
ubwa.api.get_server_time(stream_id=api_stream)
67+
ubwa_server_time = ubwa.api.get_server_time(stream_id=api_stream, return_response=True)
68+
print(f"UBRA: Time between request and answer: {(time.time() - start_time_get_server_time_ubwa)} "
69+
f"seconds\r\nreceived data:\r\n{ubwa_server_time}\r\n")
70+
print(f"Finished! Waiting for responses ...")
71+
time.sleep(2)
72+
print(f"------------------------------------------------------------------")
73+
start_time_get_server_time_ubra = time.time()
74+
ubra_server_time = ubra.get_server_time()
75+
print(f"UBRA: Time between request and answer: {(time.time() - start_time_get_server_time_ubra)} "
76+
f"seconds\r\nreceived data:\r\n{ubra_server_time}\r\n")
77+
ubra_server_time = ubra.get_server_time()
78+
print(f"UBRA: Time between request and answer: {(time.time() - start_time_get_server_time_ubra)} "
79+
f"seconds\r\nreceived data:\r\n{ubra_server_time}\r\n")
80+
ubra_server_time = ubra.get_server_time()
81+
print(f"UBRA: Time between request and answer: {(time.time() - start_time_get_server_time_ubra)} "
82+
f"seconds\r\nreceived data:\r\n{ubra_server_time}\r\n")
83+
print(f"Stopping!")
84+
ubwa.stop_manager()
85+
ubra.stop_manager()
86+
87+
if __name__ == "__main__":
88+
logging.getLogger("unicorn_binance_websocket_api")
89+
logging.basicConfig(level=logging.DEBUG,
90+
filename=os.path.basename(__file__) + '.log',
91+
format="{asctime} [{levelname:8}] {process} {thread} {module}: {message}",
92+
style="{")
93+
94+
ubwa = BinanceWebSocketApiManager(exchange='binance.com', output_default="dict")
95+
ubra = BinanceRestApiManager(exchange='binance.com')
96+
try:
97+
asyncio.run(binance_stream(ubwa))
98+
except KeyboardInterrupt:
99+
print("\r\nGracefully stopping ...")
100+
ubwa.stop_manager()
101+
ubra.stop_manager()

examples/binance_websocket_spot_userdata_async/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ To run modules of the *UNICORN Binance Suite* you need a [valid license](https:/
1616
## Usage
1717
### Running the Script:
1818
```bash
19-
python binance_websocket_spot_userdata_async.py
19+
python binance_websocket_subscribe_unsubscribe.py
2020
```
2121

2222
### Graceful Shutdown:
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Binance Spot userData WebSocket Asynchronously
2+
## Overview
3+
How do subscriptions work?
4+
5+
## Prerequisites
6+
Ensure you have Python 3.7+ installed on your system.
7+
8+
Before running the provided script, install the required Python packages:
9+
```bash
10+
pip install -r requirements.txt
11+
```
12+
## Get a UNICORN Binance Suite License
13+
To run modules of the *UNICORN Binance Suite* you need a [valid license](https://shop.lucit.services)!
14+
15+
## Usage
16+
### Running the Script:
17+
```bash
18+
python binance_websocket_subscribe_unsubscribe.py
19+
```
20+
21+
### Graceful Shutdown:
22+
The script is designed to handle a graceful shutdown upon receiving a KeyboardInterrupt (e.g., Ctrl+C) or encountering
23+
an unexpected exception.
24+
25+
## Logging
26+
The script employs logging to provide insights into its operation and to assist in troubleshooting. Logs are saved to a
27+
file named after the script with a .log extension.
28+
29+
For further assistance or to report issues, please [contact our support team](https://www.lucit.tech/get-support.html)
30+
or [visit our GitHub repository](https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api).
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
4+
from unicorn_binance_websocket_api import BinanceWebSocketApiManager
5+
import asyncio
6+
import logging
7+
import os
8+
import time
9+
10+
logging.getLogger("unicorn_binance_websocket_api")
11+
logging.basicConfig(level=logging.INFO,
12+
filename=os.path.basename(__file__) + '.log',
13+
format="{asctime} [{levelname:8}] {process} {thread} {module}: {message}",
14+
style="{")
15+
16+
17+
class BinanceDataProcessor:
18+
def __init__(self):
19+
self.ubwa = BinanceWebSocketApiManager(enable_stream_signal_buffer=True,
20+
process_stream_signals=self.receive_stream_signal,
21+
output_default="UnicornFy")
22+
self.show_time_diff = None
23+
24+
async def main(self):
25+
print(f"Creating a stream and subscribing to trades of market 'ethusdt' ...")
26+
stream_id = self.ubwa.create_stream(channels="trade", markets="ethusdt", stream_label="MyStream",
27+
process_asyncio_queue=self.process_data)
28+
time.sleep(1)
29+
time_start = time.time()
30+
self.show_time_diff = time_start
31+
print(f"Subscribing to market 'btcusdt' ...", end="")
32+
self.ubwa.subscribe_to_stream(stream_id=stream_id, markets="btcusdt")
33+
print(f" ({(time.time() - time_start)})")
34+
time_start = time.time()
35+
print(f"Unsubscribing from market 'ethusdt' ...", end="")
36+
self.ubwa.unsubscribe_from_stream(stream_id=stream_id, markets="ethusdt")
37+
print(f" ({(time.time() - time_start)})")
38+
time.sleep(3)
39+
self.ubwa.stop_manager()
40+
41+
async def process_data(self, stream_id=None):
42+
print(f"Processing data of '{self.ubwa.get_stream_label(stream_id=stream_id)}' ...")
43+
while self.ubwa.is_stop_request(stream_id=stream_id) is False:
44+
data = await self.ubwa.get_stream_data_from_asyncio_queue(stream_id)
45+
if self.show_time_diff is not None:
46+
if data.get("symbol") == "BTCUSDT":
47+
print(f"Time between subscription and first BTCUSDT receive: ({(time.time() - self.show_time_diff)})")
48+
self.show_time_diff = None
49+
try:
50+
if data['result'] is None:
51+
print(f"Result: {data}")
52+
except KeyError:
53+
pass
54+
try:
55+
if data['error'] is None:
56+
print(f"Error: {data}")
57+
except KeyError:
58+
pass
59+
if data.get('event_type') == "trade":
60+
print(f"Data: {data}")
61+
self.ubwa.asyncio_queue_task_done(stream_id)
62+
63+
def receive_stream_signal(self, signal_type=None, stream_id=None, data_record=None, error_msg=None):
64+
print(f"Received stream_signal for stream '{self.ubwa.get_stream_label(stream_id=stream_id)}': "
65+
f"{signal_type} - {stream_id} - {data_record} - {error_msg}")
66+
67+
68+
if __name__ == "__main__":
69+
bdp = BinanceDataProcessor()
70+
try:
71+
asyncio.run(bdp.main())
72+
except KeyboardInterrupt:
73+
print("\r\n")
74+
print("Gracefully stopping ...")
75+
bdp.ubwa.stop_manager()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
unicorn-binance-websocket-api

0 commit comments

Comments
 (0)