Skip to content

Commit 7b19e79

Browse files
committed
Add keepalive to the new asyncio implementation.
1 parent 7c1d1d9 commit 7b19e79

File tree

17 files changed

+274
-51
lines changed

17 files changed

+274
-51
lines changed

docs/faq/common.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ There are two main reasons why latency may increase:
9797
* Poor network connectivity.
9898
* More traffic than the recipient can handle.
9999

100-
See the discussion of :doc:`timeouts <../topics/timeouts>` for details.
100+
See the discussion of :doc:`keepalive <../topics/keepalive>` for details.
101101

102102
If websockets' default timeout of 20 seconds is too short for your use case,
103103
you can adjust it with the ``ping_timeout`` argument.
@@ -146,7 +146,7 @@ It closes the connection if it doesn't get a pong within 20 seconds.
146146

147147
You can adjust this behavior with ``ping_interval`` and ``ping_timeout``.
148148

149-
See :doc:`../topics/timeouts` for details.
149+
See :doc:`../topics/keepalive` for details.
150150

151151
How do I respond to pings?
152152
--------------------------

docs/howto/upgrade.rst

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,6 @@ Missing features
7070
If your application relies on one of them, you should stick to the original
7171
implementation until the new implementation supports it in a future release.
7272

73-
Keepalive
74-
.........
75-
76-
The new implementation doesn't provide a :ref:`keepalive mechanism <keepalive>`
77-
yet.
78-
79-
As a consequence, :func:`~asyncio.client.connect` and
80-
:func:`~asyncio.server.serve` don't accept the ``ping_interval`` and
81-
``ping_timeout`` arguments and the
82-
:attr:`~legacy.protocol.WebSocketCommonProtocol.latency` property doesn't exist.
83-
8473
HTTP Basic Authentication
8574
.........................
8675

docs/reference/features.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ Both sides
5353
+------------------------------------+--------+--------+--------+--------+
5454
| Send a pong |||||
5555
+------------------------------------+--------+--------+--------+--------+
56-
| Keepalive | ||||
56+
| Keepalive | ||||
5757
+------------------------------------+--------+--------+--------+--------+
58-
| Heartbeat | ||||
58+
| Heartbeat | ||||
5959
+------------------------------------+--------+--------+--------+--------+
6060
| Perform the closing handshake |||||
6161
+------------------------------------+--------+--------+--------+--------+

docs/reference/new-asyncio/client.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ Using a connection
4343

4444
.. autoproperty:: remote_address
4545

46+
.. autoattribute:: latency
47+
4648
.. autoproperty:: state
4749

4850
The following attributes are available after the opening handshake,

docs/reference/new-asyncio/common.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ Both sides (new :mod:`asyncio`)
3333

3434
.. autoproperty:: remote_address
3535

36+
.. autoattribute:: latency
37+
3638
.. autoproperty:: state
3739

3840
The following attributes are available after the opening handshake,

docs/reference/new-asyncio/server.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ Using a connection
6464

6565
.. autoproperty:: remote_address
6666

67+
.. autoattribute:: latency
68+
6769
.. autoproperty:: state
6870

6971
The following attributes are available after the opening handshake,

docs/topics/broadcast.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ How do we avoid running out of memory when slow clients can't keep up with the
206206
broadcast rate, then? The most straightforward option is to disconnect them.
207207

208208
If a client gets too far behind, eventually it reaches the limit defined by
209-
``ping_timeout`` and websockets terminates the connection. You can refer to
210-
the discussion of :doc:`keepalive and timeouts <timeouts>` for details.
209+
``ping_timeout`` and websockets terminates the connection. You can refer to the
210+
discussion of :doc:`keepalive <keepalive>` for details.
211211

212212
How :func:`~asyncio.connection.broadcast` works
213213
-----------------------------------------------

docs/topics/index.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Get a deeper understanding of how websockets is built and why.
1111
authentication
1212
broadcast
1313
compression
14-
timeouts
14+
keepalive
1515
design
1616
memory
1717
security

docs/topics/timeouts.rst renamed to docs/topics/keepalive.rst

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
Timeouts
2-
========
1+
Keepalive and latency
2+
=====================
33

44
.. currentmodule:: websockets
55

@@ -49,9 +49,9 @@ This mechanism serves two purposes:
4949
application gets a :exc:`~exceptions.ConnectionClosed` exception.
5050

5151
Timings are configurable with the ``ping_interval`` and ``ping_timeout``
52-
arguments of :func:`~client.connect` and :func:`~server.serve`. Shorter values
53-
will detect connection drops faster but they will increase network traffic and
54-
they will be more sensitive to latency.
52+
arguments of :func:`~asyncio.client.connect` and :func:`~asyncio.server.serve`.
53+
Shorter values will detect connection drops faster but they will increase
54+
network traffic and they will be more sensitive to latency.
5555

5656
Setting ``ping_interval`` to :obj:`None` disables the whole keepalive and
5757
heartbeat mechanism.
@@ -111,6 +111,6 @@ Latency between a client and a server may increase for two reasons:
111111
than the client can accept.
112112

113113
The latency measured during the last exchange of Ping and Pong frames is
114-
available in the :attr:`~legacy.protocol.WebSocketCommonProtocol.latency`
115-
attribute. Alternatively, you can measure the latency at any time with the
116-
:attr:`~legacy.protocol.WebSocketCommonProtocol.ping` method.
114+
available in the :attr:`~asyncio.connection.Connection.latency` attribute.
115+
Alternatively, you can measure the latency at any time with the
116+
:attr:`~asyncio.connection.Connection.ping` method.

src/websockets/asyncio/client.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,29 @@ class ClientConnection(Connection):
3737
:exc:`~websockets.exceptions.ConnectionClosedError` when the connection is
3838
closed with any other code.
3939
40+
The ``ping_interval``, ``ping_timeout``, ``close_timeout``, ``max_queue``,
41+
and ``write_limit`` arguments the same meaning as in :func:`connect`.
42+
4043
Args:
4144
protocol: Sans-I/O connection.
42-
close_timeout: Timeout for closing the connection in seconds.
43-
:obj:`None` disables the timeout.
4445
4546
"""
4647

4748
def __init__(
4849
self,
4950
protocol: ClientProtocol,
5051
*,
52+
ping_interval: float | None = 20,
53+
ping_timeout: float | None = 20,
5154
close_timeout: float | None = 10,
5255
max_queue: int | tuple[int, int | None] = 16,
5356
write_limit: int | tuple[int, int | None] = 2**15,
5457
) -> None:
5558
self.protocol: ClientProtocol
5659
super().__init__(
5760
protocol,
61+
ping_interval=ping_interval,
62+
ping_timeout=ping_timeout,
5863
close_timeout=close_timeout,
5964
max_queue=max_queue,
6065
write_limit=write_limit,
@@ -84,7 +89,9 @@ async def handshake(
8489
if self.response is None:
8590
raise ConnectionError("connection closed during handshake")
8691

87-
if self.protocol.handshake_exc is not None:
92+
if self.protocol.handshake_exc is None:
93+
self.start_keepalive()
94+
else:
8895
try:
8996
async with asyncio_timeout(self.close_timeout):
9097
await self.connection_lost_waiter
@@ -146,6 +153,10 @@ class connect:
146153
:doc:`compression guide <../../topics/compression>` for details.
147154
open_timeout: Timeout for opening the connection in seconds.
148155
:obj:`None` disables the timeout.
156+
ping_interval: Interval between keepalive pings in seconds.
157+
:obj:`None` disables keepalive.
158+
ping_timeout: Timeout for keepalive pings in seconds.
159+
:obj:`None` disables timeouts.
149160
close_timeout: Timeout for closing the connection in seconds.
150161
:obj:`None` disables the timeout.
151162
max_size: Maximum size of incoming messages in bytes.
@@ -208,6 +219,8 @@ def __init__(
208219
compression: str | None = "deflate",
209220
# Timeouts
210221
open_timeout: float | None = 10,
222+
ping_interval: float | None = 20,
223+
ping_timeout: float | None = 20,
211224
close_timeout: float | None = 10,
212225
# Limits
213226
max_size: int | None = 2**20,
@@ -256,6 +269,8 @@ def factory() -> ClientConnection:
256269
# This is a connection in websockets and a protocol in asyncio.
257270
connection = create_connection(
258271
protocol,
272+
ping_interval=ping_interval,
273+
ping_timeout=ping_timeout,
259274
close_timeout=close_timeout,
260275
max_queue=max_queue,
261276
write_limit=write_limit,

src/websockets/asyncio/connection.py

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@
2424
from ..http11 import Request, Response
2525
from ..protocol import CLOSED, OPEN, Event, Protocol, State
2626
from ..typing import Data, LoggerLike, Subprotocol
27-
from .compatibility import TimeoutError, aiter, anext, asyncio_timeout_at
27+
from .compatibility import (
28+
TimeoutError,
29+
aiter,
30+
anext,
31+
asyncio_timeout,
32+
asyncio_timeout_at,
33+
)
2834
from .messages import Assembler
2935

3036

@@ -48,11 +54,15 @@ def __init__(
4854
self,
4955
protocol: Protocol,
5056
*,
57+
ping_interval: float | None = 20,
58+
ping_timeout: float | None = 20,
5159
close_timeout: float | None = 10,
5260
max_queue: int | tuple[int, int | None] = 16,
5361
write_limit: int | tuple[int, int | None] = 2**15,
5462
) -> None:
5563
self.protocol = protocol
64+
self.ping_interval = ping_interval
65+
self.ping_timeout = ping_timeout
5666
self.close_timeout = close_timeout
5767
if isinstance(max_queue, int):
5868
max_queue = (max_queue, None)
@@ -95,6 +105,21 @@ def __init__(
95105
# Mapping of ping IDs to pong waiters, in chronological order.
96106
self.pong_waiters: dict[bytes, tuple[asyncio.Future[float], float]] = {}
97107

108+
self.latency: float = 0
109+
"""
110+
Latency of the connection, in seconds.
111+
112+
This value is updated after sending a ping frame and receiving a
113+
matching pong frame. Before the first ping, :attr:`latency` is ``0``.
114+
115+
By default, websockets enables a :ref:`keepalive <keepalive>` mechanism
116+
that sends ping frames automatically at regular intervals. You can also
117+
send ping frames and measure latency with :meth:`ping`.
118+
"""
119+
120+
# Task that sends keepalive pings. None when ping_interval is None.
121+
self.keepalive_task: asyncio.Task[None] | None = None
122+
98123
# Exception raised while reading from the connection, to be chained to
99124
# ConnectionClosed in order to show why the TCP connection dropped.
100125
self.recv_exc: BaseException | None = None
@@ -144,7 +169,8 @@ def state(self) -> State:
144169
145170
This attribute is provided for completeness. Typical applications
146171
shouldn't check its value. Instead, they should call :meth:`~recv` or
147-
:meth:`send` and handle :exc:`~exceptions.ConnectionClosed` exceptions.
172+
:meth:`send` and handle :exc:`~websockets.exceptions.ConnectionClosed`
173+
exceptions.
148174
149175
"""
150176
return self.protocol.state
@@ -540,7 +566,7 @@ async def wait_closed(self) -> None:
540566
"""
541567
await asyncio.shield(self.connection_lost_waiter)
542568

543-
async def ping(self, data: Data | None = None) -> Awaitable[None]:
569+
async def ping(self, data: Data | None = None) -> Awaitable[float]:
544570
"""
545571
Send a Ping_.
546572
@@ -643,8 +669,10 @@ def acknowledge_pings(self, data: bytes) -> None:
643669
ping_ids = []
644670
for ping_id, (pong_waiter, ping_timestamp) in self.pong_waiters.items():
645671
ping_ids.append(ping_id)
646-
pong_waiter.set_result(pong_timestamp - ping_timestamp)
672+
latency = pong_timestamp - ping_timestamp
673+
pong_waiter.set_result(latency)
647674
if ping_id == data:
675+
self.latency = latency
648676
break
649677
else:
650678
raise AssertionError("solicited pong not found in pings")
@@ -664,7 +692,8 @@ def abort_pings(self) -> None:
664692
exc = self.protocol.close_exc
665693

666694
for pong_waiter, _ping_timestamp in self.pong_waiters.values():
667-
pong_waiter.set_exception(exc)
695+
if not pong_waiter.done():
696+
pong_waiter.set_exception(exc)
668697
# If the exception is never retrieved, it will be logged when ping
669698
# is garbage-collected. This is confusing for users.
670699
# Given that ping is done (with an exception), canceling it does
@@ -673,6 +702,50 @@ def abort_pings(self) -> None:
673702

674703
self.pong_waiters.clear()
675704

705+
async def keepalive(self) -> None:
706+
"""
707+
Send a Ping frame and wait for a Pong frame at regular intervals.
708+
709+
"""
710+
assert self.ping_interval is not None
711+
latency = 0.0
712+
try:
713+
while True:
714+
# If self.ping_timeout > latency > self.ping_interval, pings
715+
# will be sent immediately after receiving pongs. The period
716+
# will be longer than self.ping_interval.
717+
await asyncio.sleep(self.ping_interval - latency)
718+
719+
self.logger.debug("% sending keepalive ping")
720+
pong_waiter = await self.ping()
721+
722+
if self.ping_timeout is not None:
723+
try:
724+
async with asyncio_timeout(self.ping_timeout):
725+
latency = await pong_waiter
726+
self.logger.debug("% received keepalive pong")
727+
except asyncio.TimeoutError:
728+
if self.debug:
729+
self.logger.debug("! timed out waiting for keepalive pong")
730+
async with self.send_context():
731+
self.protocol.fail(
732+
CloseCode.INTERNAL_ERROR,
733+
"keepalive ping timeout",
734+
)
735+
break
736+
except ConnectionClosed:
737+
pass
738+
except Exception:
739+
self.logger.error("keepalive ping failed", exc_info=True)
740+
741+
def start_keepalive(self) -> None:
742+
"""
743+
Run :meth:`keepalive` in a task, unless keepalive is disabled.
744+
745+
"""
746+
if self.ping_interval is not None:
747+
self.keepalive_task = self.loop.create_task(self.keepalive())
748+
676749
@contextlib.asynccontextmanager
677750
async def send_context(
678751
self,
@@ -835,11 +908,15 @@ def connection_lost(self, exc: Exception | None) -> None:
835908
self.protocol.receive_eof() # receive_eof is idempotent
836909
self.recv_messages.close()
837910
self.set_recv_exc(exc)
911+
self.abort_pings()
912+
# If keepalive() was waiting for a pong, abort_pings() terminated it.
913+
# If it was sleeping until the next ping, we need to cancel it now
914+
if self.keepalive_task is not None:
915+
self.keepalive_task.cancel()
838916
# If self.connection_lost_waiter isn't pending, that's a bug, because:
839917
# - it's set only here in connection_lost() which is called only once;
840918
# - it must never be canceled.
841919
self.connection_lost_waiter.set_result(None)
842-
self.abort_pings()
843920

844921
# Adapted from asyncio.streams.FlowControlMixin
845922
if self.paused: # pragma: no cover

0 commit comments

Comments
 (0)