Skip to content

Commit c6967ab

Browse files
NoahStappJibola
andauthored
PYTHON-3472 - Add log messages to SDAM spec (mongodb#1771)
Co-authored-by: Jib <[email protected]>
1 parent 28697df commit c6967ab

17 files changed

+2064
-46
lines changed

pymongo/asynchronous/monitor.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
import atexit
20+
import logging
2021
import time
2122
import weakref
2223
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
@@ -28,6 +29,7 @@
2829
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
2930
from pymongo.hello import Hello
3031
from pymongo.lock import _create_lock
32+
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
3133
from pymongo.pool_options import _is_faas
3234
from pymongo.read_preferences import MovingAverage
3335
from pymongo.server_description import ServerDescription
@@ -257,10 +259,21 @@ async def _check_server(self) -> ServerDescription:
257259
sd = self._server_description
258260
address = sd.address
259261
duration = _monotonic_duration(start)
262+
awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version)
260263
if self._publish:
261-
awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version)
262264
assert self._listeners is not None
263265
self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited)
266+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
267+
_debug_log(
268+
_SDAM_LOGGER,
269+
topologyId=self._topology._topology_id,
270+
serverHost=address[0],
271+
serverPort=address[1],
272+
awaited=awaited,
273+
durationMS=duration * 1000,
274+
failure=error,
275+
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
276+
)
264277
await self._reset_connection()
265278
if isinstance(error, _OperationCancelled):
266279
raise
@@ -274,22 +287,32 @@ async def _check_once(self) -> ServerDescription:
274287
Returns a ServerDescription, or raises an exception.
275288
"""
276289
address = self._server_description.address
290+
sd = self._server_description
291+
292+
# XXX: "awaited" could be incorrectly set to True in the rare case
293+
# the pool checkout closes and recreates a connection.
294+
awaited = bool(
295+
self._pool.conns and self._stream and sd.is_server_type_known and sd.topology_version
296+
)
277297
if self._publish:
278298
assert self._listeners is not None
279-
sd = self._server_description
280-
# XXX: "awaited" could be incorrectly set to True in the rare case
281-
# the pool checkout closes and recreates a connection.
282-
awaited = bool(
283-
self._pool.conns
284-
and self._stream
285-
and sd.is_server_type_known
286-
and sd.topology_version
287-
)
288299
self._listeners.publish_server_heartbeat_started(address, awaited)
289300

290301
if self._cancel_context and self._cancel_context.cancelled:
291302
await self._reset_connection()
292303
async with self._pool.checkout() as conn:
304+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
305+
_debug_log(
306+
_SDAM_LOGGER,
307+
topologyId=self._topology._topology_id,
308+
driverConnectionId=conn.id,
309+
serverConnectionId=conn.server_connection_id,
310+
serverHost=address[0],
311+
serverPort=address[1],
312+
awaited=awaited,
313+
message=_SDAMStatusMessage.HEARTBEAT_START,
314+
)
315+
293316
self._cancel_context = conn.cancel_context
294317
response, round_trip_time = await self._check_with_socket(conn)
295318
if not response.awaitable:
@@ -302,6 +325,19 @@ async def _check_once(self) -> ServerDescription:
302325
self._listeners.publish_server_heartbeat_succeeded(
303326
address, round_trip_time, response, response.awaitable
304327
)
328+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
329+
_debug_log(
330+
_SDAM_LOGGER,
331+
topologyId=self._topology._topology_id,
332+
driverConnectionId=conn.id,
333+
serverConnectionId=conn.server_connection_id,
334+
serverHost=address[0],
335+
serverPort=address[1],
336+
awaited=awaited,
337+
durationMS=round_trip_time * 1000,
338+
reply=response.document,
339+
message=_SDAMStatusMessage.HEARTBEAT_SUCCESS,
340+
)
305341
return sd
306342

307343
async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]:

pymongo/asynchronous/server.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@
3030
from pymongo.asynchronous.helpers import _handle_reauth
3131
from pymongo.errors import NotPrimaryError, OperationFailure
3232
from pymongo.helpers_shared import _check_command_response
33-
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
33+
from pymongo.logger import (
34+
_COMMAND_LOGGER,
35+
_SDAM_LOGGER,
36+
_CommandStatusMessage,
37+
_debug_log,
38+
_SDAMStatusMessage,
39+
)
3440
from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query
3541
from pymongo.response import PinnedResponse, Response
3642

@@ -99,6 +105,15 @@ async def close(self) -> None:
99105
(self._description.address, self._topology_id),
100106
)
101107
)
108+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
109+
_debug_log(
110+
_SDAM_LOGGER,
111+
topologyId=self._topology_id,
112+
serverHost=self._description.address[0],
113+
serverPort=self._description.address[1],
114+
message=_SDAMStatusMessage.STOP_SERVER,
115+
)
116+
102117
await self._monitor.close()
103118
await self._pool.close()
104119

pymongo/asynchronous/topology.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@
4646
from pymongo.hello import Hello
4747
from pymongo.lock import _ACondition, _ALock, _create_lock
4848
from pymongo.logger import (
49+
_SDAM_LOGGER,
4950
_SERVER_SELECTION_LOGGER,
5051
_debug_log,
52+
_SDAMStatusMessage,
5153
_ServerSelectionStatusMessage,
5254
)
5355
from pymongo.pool_options import PoolOptions
@@ -110,6 +112,13 @@ def __init__(self, topology_settings: TopologySettings):
110112
if self._publish_server or self._publish_tp:
111113
self._events = queue.Queue(maxsize=100)
112114

115+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
116+
_debug_log(
117+
_SDAM_LOGGER,
118+
topologyId=self._topology_id,
119+
message=_SDAMStatusMessage.START_TOPOLOGY,
120+
)
121+
113122
if self._publish_tp:
114123
assert self._events is not None
115124
self._events.put((self._listeners.publish_topology_opened, (self._topology_id,)))
@@ -124,22 +133,38 @@ def __init__(self, topology_settings: TopologySettings):
124133
)
125134

126135
self._description = topology_description
136+
initial_td = TopologyDescription(
137+
TOPOLOGY_TYPE.Unknown, {}, None, None, None, self._settings
138+
)
127139
if self._publish_tp:
128140
assert self._events is not None
129-
initial_td = TopologyDescription(
130-
TOPOLOGY_TYPE.Unknown, {}, None, None, None, self._settings
131-
)
132141
self._events.put(
133142
(
134143
self._listeners.publish_topology_description_changed,
135144
(initial_td, self._description, self._topology_id),
136145
)
137146
)
147+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
148+
_debug_log(
149+
_SDAM_LOGGER,
150+
topologyId=self._topology_id,
151+
previousDescription=initial_td,
152+
newDescription=self._description,
153+
message=_SDAMStatusMessage.TOPOLOGY_CHANGE,
154+
)
138155

139156
for seed in topology_settings.seeds:
140157
if self._publish_server:
141158
assert self._events is not None
142159
self._events.put((self._listeners.publish_server_opened, (seed, self._topology_id)))
160+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
161+
_debug_log(
162+
_SDAM_LOGGER,
163+
topologyId=self._topology_id,
164+
serverHost=seed[0],
165+
serverPort=seed[1],
166+
message=_SDAMStatusMessage.START_SERVER,
167+
)
143168

144169
# Store the seed list to help diagnose errors in _error_message().
145170
self._seed_addresses = list(topology_description.server_descriptions())
@@ -472,6 +497,14 @@ async def _process_change(
472497
(td_old, self._description, self._topology_id),
473498
)
474499
)
500+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
501+
_debug_log(
502+
_SDAM_LOGGER,
503+
topologyId=self._topology_id,
504+
previousDescription=td_old,
505+
newDescription=self._description,
506+
message=_SDAMStatusMessage.TOPOLOGY_CHANGE,
507+
)
475508

476509
# Shutdown SRV polling for unsupported cluster types.
477510
# This is only applicable if the old topology was Unknown, and the
@@ -530,6 +563,14 @@ async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
530563
(td_old, self._description, self._topology_id),
531564
)
532565
)
566+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
567+
_debug_log(
568+
_SDAM_LOGGER,
569+
topologyId=self._topology_id,
570+
previousDescription=td_old,
571+
newDescription=self._description,
572+
message=_SDAMStatusMessage.TOPOLOGY_CHANGE,
573+
)
533574

534575
async def on_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
535576
"""Process a new list of nodes obtained from scanning SRV records."""
@@ -684,6 +725,18 @@ async def close(self) -> None:
684725
)
685726
)
686727
self._events.put((self._listeners.publish_topology_closed, (self._topology_id,)))
728+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
729+
_debug_log(
730+
_SDAM_LOGGER,
731+
topologyId=self._topology_id,
732+
previousDescription=old_td,
733+
newDescription=self._description,
734+
message=_SDAMStatusMessage.TOPOLOGY_CHANGE,
735+
)
736+
_debug_log(
737+
_SDAM_LOGGER, topologyId=self._topology_id, message=_SDAMStatusMessage.STOP_TOPOLOGY
738+
)
739+
687740
if self._publish_server or self._publish_tp:
688741
# Make sure the events executor thread is fully closed before publishing the remaining events
689742
self.__events_executor.close()

pymongo/logger.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ class _ConnectionStatusMessage(str, enum.Enum):
5353
CHECKEDIN = "Connection checked in"
5454

5555

56+
class _SDAMStatusMessage(str, enum.Enum):
57+
START_TOPOLOGY = "Starting topology monitoring"
58+
STOP_TOPOLOGY = "Stopped topology monitoring"
59+
START_SERVER = "Starting server monitoring"
60+
STOP_SERVER = "Stopped server monitoring"
61+
TOPOLOGY_CHANGE = "Topology description changed"
62+
HEARTBEAT_START = "Server heartbeat started"
63+
HEARTBEAT_SUCCESS = "Server heartbeat succeeded"
64+
HEARTBEAT_FAIL = "Server heartbeat failed"
65+
66+
5667
_DEFAULT_DOCUMENT_LENGTH = 1000
5768
_SENSITIVE_COMMANDS = [
5869
"authenticate",
@@ -73,6 +84,7 @@ class _ConnectionStatusMessage(str, enum.Enum):
7384
_CONNECTION_LOGGER = logging.getLogger("pymongo.connection")
7485
_SERVER_SELECTION_LOGGER = logging.getLogger("pymongo.serverSelection")
7586
_CLIENT_LOGGER = logging.getLogger("pymongo.client")
87+
_SDAM_LOGGER = logging.getLogger("pymongo.topology")
7688
_VERBOSE_CONNECTION_ERROR_REASONS = {
7789
ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed",
7890
ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed",
@@ -129,7 +141,7 @@ def _is_sensitive(self, doc_name: str) -> bool:
129141
)
130142

131143
is_sensitive_hello = (
132-
self._kwargs["commandName"] in _HELLO_COMMANDS and is_speculative_authenticate
144+
self._kwargs.get("commandName", None) in _HELLO_COMMANDS and is_speculative_authenticate
133145
)
134146

135147
return is_sensitive_command or is_sensitive_hello

pymongo/synchronous/monitor.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
import atexit
20+
import logging
2021
import time
2122
import weakref
2223
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
@@ -26,6 +27,7 @@
2627
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
2728
from pymongo.hello import Hello
2829
from pymongo.lock import _create_lock
30+
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
2931
from pymongo.pool_options import _is_faas
3032
from pymongo.read_preferences import MovingAverage
3133
from pymongo.server_description import ServerDescription
@@ -257,10 +259,21 @@ def _check_server(self) -> ServerDescription:
257259
sd = self._server_description
258260
address = sd.address
259261
duration = _monotonic_duration(start)
262+
awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version)
260263
if self._publish:
261-
awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version)
262264
assert self._listeners is not None
263265
self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited)
266+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
267+
_debug_log(
268+
_SDAM_LOGGER,
269+
topologyId=self._topology._topology_id,
270+
serverHost=address[0],
271+
serverPort=address[1],
272+
awaited=awaited,
273+
durationMS=duration * 1000,
274+
failure=error,
275+
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
276+
)
264277
self._reset_connection()
265278
if isinstance(error, _OperationCancelled):
266279
raise
@@ -274,22 +287,32 @@ def _check_once(self) -> ServerDescription:
274287
Returns a ServerDescription, or raises an exception.
275288
"""
276289
address = self._server_description.address
290+
sd = self._server_description
291+
292+
# XXX: "awaited" could be incorrectly set to True in the rare case
293+
# the pool checkout closes and recreates a connection.
294+
awaited = bool(
295+
self._pool.conns and self._stream and sd.is_server_type_known and sd.topology_version
296+
)
277297
if self._publish:
278298
assert self._listeners is not None
279-
sd = self._server_description
280-
# XXX: "awaited" could be incorrectly set to True in the rare case
281-
# the pool checkout closes and recreates a connection.
282-
awaited = bool(
283-
self._pool.conns
284-
and self._stream
285-
and sd.is_server_type_known
286-
and sd.topology_version
287-
)
288299
self._listeners.publish_server_heartbeat_started(address, awaited)
289300

290301
if self._cancel_context and self._cancel_context.cancelled:
291302
self._reset_connection()
292303
with self._pool.checkout() as conn:
304+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
305+
_debug_log(
306+
_SDAM_LOGGER,
307+
topologyId=self._topology._topology_id,
308+
driverConnectionId=conn.id,
309+
serverConnectionId=conn.server_connection_id,
310+
serverHost=address[0],
311+
serverPort=address[1],
312+
awaited=awaited,
313+
message=_SDAMStatusMessage.HEARTBEAT_START,
314+
)
315+
293316
self._cancel_context = conn.cancel_context
294317
response, round_trip_time = self._check_with_socket(conn)
295318
if not response.awaitable:
@@ -302,6 +325,19 @@ def _check_once(self) -> ServerDescription:
302325
self._listeners.publish_server_heartbeat_succeeded(
303326
address, round_trip_time, response, response.awaitable
304327
)
328+
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
329+
_debug_log(
330+
_SDAM_LOGGER,
331+
topologyId=self._topology._topology_id,
332+
driverConnectionId=conn.id,
333+
serverConnectionId=conn.server_connection_id,
334+
serverHost=address[0],
335+
serverPort=address[1],
336+
awaited=awaited,
337+
durationMS=round_trip_time * 1000,
338+
reply=response.document,
339+
message=_SDAMStatusMessage.HEARTBEAT_SUCCESS,
340+
)
305341
return sd
306342

307343
def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:

0 commit comments

Comments
 (0)