Skip to content

fix: Reduce idle CPU consumption of websocket server #1040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: ros2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions rosbridge_server/scripts/rosbridge_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.


import asyncio
import signal
import sys
import threading
import time

import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile
from rosbridge_library.capabilities.advertise import Advertise
Expand All @@ -46,26 +49,18 @@
from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService
from std_msgs.msg import Int32
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.netutil import bind_sockets
from tornado.web import Application

from rosbridge_server import ClientManager, RosbridgeWebSocket


def start_hook():
IOLoop.instance().start()


def shutdown_hook():
IOLoop.instance().stop()


class RosbridgeWebsocketNode(Node):
def __init__(self):
super().__init__("rosbridge_websocket")

RosbridgeWebSocket.node_handle = self
RosbridgeWebSocket.event_loop = asyncio.get_event_loop()

##################################################
# Parameter handling #
Expand Down Expand Up @@ -332,31 +327,42 @@ def protocol_parameter_handling(self):
CallService.services_glob = RosbridgeWebSocket.services_glob


def main(args=None):
if args is None:
args = sys.argv
async def async_main():
rclpy.init(args=sys.argv, signal_handler_options=rclpy.signals.SignalHandlerOptions.NO)

rclpy.init(args=args)
node = RosbridgeWebsocketNode()

executor = rclpy.executors.SingleThreadedExecutor()
executor = SingleThreadedExecutor()
executor.add_node(node)

def spin_ros():
executor.spin_once(timeout_sec=0.01)
if not rclpy.ok():
shutdown_hook()

spin_callback = PeriodicCallback(spin_ros, 1)
spin_callback.start()
try:
start_hook()
node.destroy_node()
rclpy.shutdown()
except KeyboardInterrupt:
spin_thread = threading.Thread(target=executor.spin)
spin_thread.start()

loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
signal_handled = False

def handle_signal():
nonlocal signal_handled
if signal_handled:
return
print("Exiting due to SIGINT")
finally:
shutdown_hook() # shutdown hook to stop the server
stop_event.set()
executor.shutdown()
signal_handled = True

for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, handle_signal)

await stop_event.wait()
spin_thread.join()

node.destroy_node()
rclpy.shutdown()


def main():
asyncio.run(async_main())


if __name__ == "__main__":
Expand Down
11 changes: 6 additions & 5 deletions rosbridge_server/src/rosbridge_server/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,19 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import asyncio
import sys
import threading
import traceback
import uuid
from collections import deque
from functools import partial, wraps
from functools import wraps

from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
from rosbridge_library.util import bson
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.websocket import WebSocketClosedError, WebSocketHandler

_io_loop = IOLoop.instance()


def _log_exception():
"""Log the most recent exception to ROS."""
Expand Down Expand Up @@ -124,6 +122,7 @@ class RosbridgeWebSocket(WebSocketHandler):
unregister_timeout = 10.0 # seconds
bson_only_mode = False
node_handle = None
event_loop = None

@log_exceptions
def open(self):
Expand Down Expand Up @@ -174,14 +173,16 @@ def on_close(self):
self.incoming_queue.finish()

def send_message(self, message, compression="none"):
cls = self.__class__

if isinstance(message, bson.BSON):
binary = True
elif compression in ["cbor", "cbor-raw"]:
binary = True
else:
binary = False

_io_loop.add_callback(partial(self.prewrite_message, message, binary))
asyncio.run_coroutine_threadsafe(self.prewrite_message(message, binary), cls.event_loop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If event_loop is None (its default value), will this error? Should we check?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably will, the same with node_handle. I'm not happy with passing arguments through class variables but I just tried to adhere to how it is currently done.


async def prewrite_message(self, message, binary):
cls = self.__class__
Expand Down
Loading