Skip to content

Commit c3b162d

Browse files
committed
Add broadcast to the new asyncio implementation.
1 parent 5eafbe4 commit c3b162d

File tree

15 files changed

+341
-80
lines changed

15 files changed

+341
-80
lines changed

docs/faq/server.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ Record all connections in a global variable::
102102
finally:
103103
CONNECTIONS.remove(websocket)
104104

105-
Then, call :func:`~websockets.broadcast`::
105+
Then, call :func:`~asyncio.connection.broadcast`::
106106

107107
import websockets
108108

docs/howto/upgrade.rst

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,6 @@ Missing features
7070
If your application relies on one of them, you should stick to the original
7171
implementation until the new implementation supports it in a future release.
7272

73-
Broadcast
74-
.........
75-
76-
The new implementation doesn't support :doc:`broadcasting messages
77-
<../topics/broadcast>` yet.
78-
7973
Keepalive
8074
.........
8175

@@ -178,8 +172,8 @@ Server APIs
178172
| :class:`websockets.server.WebSocketServerProtocol` |br| | |
179173
| ``websockets.legacy.server.WebSocketServerProtocol`` | |
180174
+-------------------------------------------------------------------+-----------------------------------------------------+
181-
| :func:`websockets.broadcast` |br| | *not available yet* |
182-
| ``websockets.legacy.protocol.broadcast()`` | |
175+
| ``websockets.broadcast`` |br| | :func:`websockets.asyncio.connection.broadcast` |
176+
| :func:`websockets.legacy.protocol.broadcast()` | |
183177
+-------------------------------------------------------------------+-----------------------------------------------------+
184178
| ``websockets.BasicAuthWebSocketServerProtocol`` |br| | *not available yet* |
185179
| :class:`websockets.auth.BasicAuthWebSocketServerProtocol` |br| | |

docs/intro/tutorial2.rst

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ you're using this pattern:
482482
...
483483
484484
Since this is a very common pattern in WebSocket servers, websockets provides
485-
the :func:`broadcast` helper for this purpose:
485+
the :func:`~legacy.protocol.broadcast` helper for this purpose:
486486

487487
.. code-block:: python
488488
@@ -494,13 +494,14 @@ the :func:`broadcast` helper for this purpose:
494494
495495
...
496496
497-
Calling :func:`broadcast` once is more efficient than
497+
Calling :func:`legacy.protocol.broadcast` once is more efficient than
498498
calling :meth:`~legacy.protocol.WebSocketCommonProtocol.send` in a loop.
499499

500-
However, there's a subtle difference in behavior. Did you notice that there's
501-
no ``await`` in the second version? Indeed, :func:`broadcast` is a function,
502-
not a coroutine like :meth:`~legacy.protocol.WebSocketCommonProtocol.send`
503-
or :meth:`~legacy.protocol.WebSocketCommonProtocol.recv`.
500+
However, there's a subtle difference in behavior. Did you notice that there's no
501+
``await`` in the second version? Indeed, :func:`legacy.protocol.broadcast` is a
502+
function, not a coroutine like
503+
:meth:`~legacy.protocol.WebSocketCommonProtocol.send` or
504+
:meth:`~legacy.protocol.WebSocketCommonProtocol.recv`.
504505

505506
It's quite obvious why :meth:`~legacy.protocol.WebSocketCommonProtocol.recv`
506507
is a coroutine. When you want to receive the next message, you have to wait
@@ -521,7 +522,8 @@ That said, when you're sending the same messages to many clients in a loop,
521522
applying backpressure in this way can become counterproductive. When you're
522523
broadcasting, you don't want to slow down everyone to the pace of the slowest
523524
clients; you want to drop clients that cannot keep up with the data stream.
524-
That's why :func:`broadcast` doesn't wait until write buffers drain.
525+
That's why :func:`legacy.protocol.broadcast` doesn't wait until write buffers
526+
drain.
525527

526528
For our Connect Four game, there's no difference in practice: the total amount
527529
of data sent on a connection for a game of Connect Four is less than 64 KB,

docs/project/changelog.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ Improvements
212212

213213
* Added platform-independent wheels.
214214

215-
* Improved error handling in :func:`~websockets.broadcast`.
215+
* Improved error handling in :func:`~legacy.protocol.broadcast`.
216216

217217
* Set ``server_hostname`` automatically on TLS connections when providing a
218218
``sock`` argument to :func:`~sync.client.connect`.
@@ -402,7 +402,7 @@ New features
402402

403403
* Added compatibility with Python 3.10.
404404

405-
* Added :func:`~websockets.broadcast` to send a message to many clients.
405+
* Added :func:`~legacy.protocol.broadcast` to send a message to many clients.
406406

407407
* Added support for reconnecting automatically by using
408408
:func:`~client.connect` as an asynchronous iterator.

docs/reference/asyncio/server.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,4 @@ websockets supports HTTP Basic Authentication according to
110110
Broadcast
111111
---------
112112

113-
.. autofunction:: websockets.broadcast
113+
.. autofunction:: websockets.legacy.protocol.broadcast

docs/reference/new-asyncio/server.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,8 @@ Using a connection
7070
.. autoattribute:: response
7171

7272
.. autoproperty:: subprotocol
73+
74+
Broadcast
75+
---------
76+
77+
.. autofunction:: websockets.asyncio.connection.broadcast

docs/topics/broadcast.rst

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
1-
Broadcasting messages
2-
=====================
1+
Broadcasting
2+
============
33

44
.. currentmodule:: websockets
55

6-
7-
.. admonition:: If you just want to send a message to all connected clients,
8-
use :func:`broadcast`.
6+
.. admonition:: If you want to send a message to all connected clients,
7+
use :func:`~asyncio.connection.broadcast`.
98
:class: tip
109

11-
If you want to learn about its design in depth, continue reading this
12-
document.
10+
If you want to learn about its design, continue reading this document.
11+
12+
For the legacy :mod:`asyncio` implementation, use
13+
:func:`~legacy.protocol.broadcast`.
1314

1415
WebSocket servers often send the same message to all connected clients or to a
1516
subset of clients for which the message is relevant.
1617

17-
Let's explore options for broadcasting a message, explain the design
18-
of :func:`broadcast`, and discuss alternatives.
18+
Let's explore options for broadcasting a message, explain the design of
19+
:func:`~asyncio.connection.broadcast`, and discuss alternatives.
1920

2021
For each option, we'll provide a connection handler called ``handler()`` and a
2122
function or coroutine called ``broadcast()`` that sends a message to all
@@ -24,7 +25,7 @@ connected clients.
2425
Integrating them is left as an exercise for the reader. You could start with::
2526

2627
import asyncio
27-
import websockets
28+
from websockets.asyncio.server import serve
2829

2930
async def handler(websocket):
3031
...
@@ -39,7 +40,7 @@ Integrating them is left as an exercise for the reader. You could start with::
3940
await broadcast(message)
4041

4142
async def main():
42-
async with websockets.serve(handler, "localhost", 8765):
43+
async with serve(handler, "localhost", 8765):
4344
await broadcast_messages() # runs forever
4445

4546
if __name__ == "__main__":
@@ -82,11 +83,13 @@ to::
8283

8384
Here's a coroutine that broadcasts a message to all clients::
8485

86+
from websockets import ConnectionClosed
87+
8588
async def broadcast(message):
8689
for websocket in CLIENTS.copy():
8790
try:
8891
await websocket.send(message)
89-
except websockets.ConnectionClosed:
92+
except ConnectionClosed:
9093
pass
9194

9295
There are two tricks in this version of ``broadcast()``.
@@ -117,11 +120,11 @@ which is usually outside of the control of the server.
117120

118121
If you know for sure that you will never write more than ``write_limit`` bytes
119122
within ``ping_interval + ping_timeout``, then websockets will terminate slow
120-
connections before the write buffer has time to fill up.
123+
connections before the write buffer can fill up.
121124

122-
Don't set extreme ``write_limit``, ``ping_interval``, and ``ping_timeout``
123-
values to ensure that this condition holds. Set reasonable values and use the
124-
built-in :func:`broadcast` function instead.
125+
Don't set extreme values of ``write_limit``, ``ping_interval``, or
126+
``ping_timeout`` to ensure that this condition holds! Instead, set reasonable
127+
values and use the built-in :func:`~asyncio.connection.broadcast` function.
125128

126129
The concurrent way
127130
------------------
@@ -134,7 +137,7 @@ Let's modify ``broadcast()`` to send messages concurrently::
134137
async def send(websocket, message):
135138
try:
136139
await websocket.send(message)
137-
except websockets.ConnectionClosed:
140+
except ConnectionClosed:
138141
pass
139142

140143
def broadcast(message):
@@ -179,20 +182,20 @@ doesn't work well when broadcasting a message to thousands of clients.
179182

180183
When you're sending messages to a single client, you don't want to send them
181184
faster than the network can transfer them and the client accept them. This is
182-
why :meth:`~server.WebSocketServerProtocol.send` checks if the write buffer
183-
is full and, if it is, waits until it drain, giving the network and the
184-
client time to catch up. This provides backpressure.
185+
why :meth:`~asyncio.server.ServerConnection.send` checks if the write buffer is
186+
above the high-water mark and, if it is, waits until it drains, giving the
187+
network and the client time to catch up. This provides backpressure.
185188

186189
Without backpressure, you could pile up data in the write buffer until the
187190
server process runs out of memory and the operating system kills it.
188191

189-
The :meth:`~server.WebSocketServerProtocol.send` API is designed to enforce
192+
The :meth:`~asyncio.server.ServerConnection.send` API is designed to enforce
190193
backpressure by default. This helps users of websockets write robust programs
191194
even if they never heard about backpressure.
192195

193196
For comparison, :class:`asyncio.StreamWriter` requires users to understand
194-
backpressure and to await :meth:`~asyncio.StreamWriter.drain` explicitly
195-
after each :meth:`~asyncio.StreamWriter.write`.
197+
backpressure and to await :meth:`~asyncio.StreamWriter.drain` after each
198+
:meth:`~asyncio.StreamWriter.write` — or at least sufficiently frequently.
196199

197200
When broadcasting messages, backpressure consists in slowing down all clients
198201
in an attempt to let the slowest client catch up. With thousands of clients,
@@ -203,14 +206,14 @@ How do we avoid running out of memory when slow clients can't keep up with the
203206
broadcast rate, then? The most straightforward option is to disconnect them.
204207

205208
If a client gets too far behind, eventually it reaches the limit defined by
206-
``ping_timeout`` and websockets terminates the connection. You can read the
207-
discussion of :doc:`keepalive and timeouts <./timeouts>` for details.
209+
``ping_timeout`` and websockets terminates the connection. You can refer to
210+
the discussion of :doc:`keepalive and timeouts <timeouts>` for details.
208211

209-
How :func:`broadcast` works
210-
---------------------------
212+
How :func:`~asyncio.connection.broadcast` works
213+
-----------------------------------------------
211214

212-
The built-in :func:`broadcast` function is similar to the naive way. The main
213-
difference is that it doesn't apply backpressure.
215+
The built-in :func:`~asyncio.connection.broadcast` function is similar to the
216+
naive way. The main difference is that it doesn't apply backpressure.
214217

215218
This provides the best performance by avoiding the overhead of scheduling and
216219
running one task per client.
@@ -321,9 +324,9 @@ the asynchronous iterator returned by ``subscribe()``.
321324
Performance considerations
322325
--------------------------
323326

324-
The built-in :func:`broadcast` function sends all messages without yielding
325-
control to the event loop. So does the naive way when the network and clients
326-
are fast and reliable.
327+
The built-in :func:`~asyncio.connection.broadcast` function sends all messages
328+
without yielding control to the event loop. So does the naive way when the
329+
network and clients are fast and reliable.
327330

328331
For each client, a WebSocket frame is prepared and sent to the network. This
329332
is the minimum amount of work required to broadcast a message.
@@ -343,7 +346,7 @@ However, this isn't possible in general for two reasons:
343346

344347
All other patterns discussed above yield control to the event loop once per
345348
client because messages are sent by different tasks. This makes them slower
346-
than the built-in :func:`broadcast` function.
349+
than the built-in :func:`~asyncio.connection.broadcast` function.
347350

348351
There is no major difference between the performance of per-client queues and
349352
publish–subscribe.

docs/topics/logging.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ Here's what websockets logs at each level.
220220
``WARNING``
221221
...........
222222

223-
* Failures in :func:`~websockets.broadcast`
223+
* Failures in :func:`~asyncio.connection.broadcast`
224224

225225
``INFO``
226226
........

docs/topics/performance.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
Performance
22
===========
33

4+
.. currentmodule:: websockets
5+
46
Here are tips to optimize performance.
57

68
uvloop
@@ -16,5 +18,5 @@ application.)
1618
broadcast
1719
---------
1820

19-
:func:`~websockets.broadcast` is the most efficient way to send a message to
20-
many clients.
21+
:func:`~asyncio.connection.broadcast` is the most efficient way to send a
22+
message to many clients.

experiments/broadcast/server.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import sys
77
import time
88

9-
import websockets
9+
from websockets import ConnectionClosed
10+
from websockets.asyncio.server import serve
11+
from websockets.asyncio.connection import broadcast
1012

1113

1214
CLIENTS = set()
@@ -15,7 +17,7 @@
1517
async def send(websocket, message):
1618
try:
1719
await websocket.send(message)
18-
except websockets.ConnectionClosed:
20+
except ConnectionClosed:
1921
pass
2022

2123

@@ -43,9 +45,6 @@ async def subscribe(self):
4345
__aiter__ = subscribe
4446

4547

46-
PUBSUB = PubSub()
47-
48-
4948
async def handler(websocket, method=None):
5049
if method in ["default", "naive", "task", "wait"]:
5150
CLIENTS.add(websocket)
@@ -63,14 +62,18 @@ async def handler(websocket, method=None):
6362
CLIENTS.remove(queue)
6463
relay_task.cancel()
6564
elif method == "pubsub":
65+
global PUBSUB
6666
async for message in PUBSUB:
6767
await websocket.send(message)
6868
else:
6969
raise NotImplementedError(f"unsupported method: {method}")
7070

7171

72-
async def broadcast(method, size, delay):
72+
async def broadcast_messages(method, size, delay):
7373
"""Broadcast messages at regular intervals."""
74+
if method == "pubsub":
75+
global PUBSUB
76+
PUBSUB = PubSub()
7477
load_average = 0
7578
time_average = 0
7679
pc1, pt1 = time.perf_counter_ns(), time.process_time_ns()
@@ -90,7 +93,7 @@ async def broadcast(method, size, delay):
9093
message = str(time.time_ns()).encode() + b" " + os.urandom(size - 20)
9194

9295
if method == "default":
93-
websockets.broadcast(CLIENTS, message)
96+
broadcast(CLIENTS, message)
9497
elif method == "naive":
9598
# Since the loop can yield control, make a copy of CLIENTS
9699
# to avoid: RuntimeError: Set changed size during iteration
@@ -128,14 +131,14 @@ async def broadcast(method, size, delay):
128131

129132

130133
async def main(method, size, delay):
131-
async with websockets.serve(
134+
async with serve(
132135
functools.partial(handler, method=method),
133136
"localhost",
134137
8765,
135138
compression=None,
136139
ping_timeout=None,
137140
):
138-
await broadcast(method, size, delay)
141+
await broadcast_messages(method, size, delay)
139142

140143

141144
if __name__ == "__main__":

0 commit comments

Comments
 (0)