Skip to content

Commit a3f34ef

Browse files
authored
Client connection / maybe_refresh_metadata changes (#2507)
1 parent e94bd4f commit a3f34ef

File tree

2 files changed

+49
-36
lines changed

2 files changed

+49
-36
lines changed

kafka/client_async.py

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ def _should_recycle_connection(self, conn):
398398

399399
return False
400400

401-
def _maybe_connect(self, node_id):
401+
def _init_connect(self, node_id):
402402
"""Idempotent non-blocking connection attempt to the given node id.
403403
404404
Returns True if connection object exists and is connected / connecting
@@ -427,10 +427,8 @@ def _maybe_connect(self, node_id):
427427
**self.config)
428428
self._conns[node_id] = conn
429429

430-
elif conn.connected():
431-
return True
432-
433-
conn.connect()
430+
if conn.disconnected():
431+
conn.connect()
434432
return not conn.disconnected()
435433

436434
def ready(self, node_id, metadata_priority=True):
@@ -621,15 +619,18 @@ def poll(self, timeout_ms=None, future=None):
621619
if self._closed:
622620
break
623621

624-
# Send a metadata request if needed (or initiate new connection)
625-
metadata_timeout_ms = self._maybe_refresh_metadata()
626-
627622
# Attempt to complete pending connections
628623
for node_id in list(self._connecting):
629624
# False return means no more connection progress is possible
630625
# Connected nodes will update _connecting via state_change callback
631-
if not self._maybe_connect(node_id):
632-
self._connecting.remove(node_id)
626+
if not self._init_connect(node_id):
627+
# It's possible that the connection attempt triggered a state change
628+
# but if not, make sure to remove from _connecting list
629+
if node_id in self._connecting:
630+
self._connecting.remove(node_id)
631+
632+
# Send a metadata request if needed (or initiate new connection)
633+
metadata_timeout_ms = self._maybe_refresh_metadata()
633634

634635
# If we got a future that is already done, don't block in _poll
635636
if future is not None and future.is_done:
@@ -679,6 +680,8 @@ def _poll(self, timeout):
679680
self._register_send_sockets()
680681

681682
start_select = time.time()
683+
if timeout == float('inf'):
684+
timeout = None
682685
ready = self._selector.select(timeout)
683686
end_select = time.time()
684687
if self._sensors:
@@ -893,6 +896,26 @@ def _maybe_refresh_metadata(self, wakeup=False):
893896
log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms)
894897
return next_connect_ms
895898

899+
if not self._can_send_request(node_id):
900+
# If there's any connection establishment underway, wait until it completes. This prevents
901+
# the client from unnecessarily connecting to additional nodes while a previous connection
902+
# attempt has not been completed.
903+
if self._connecting:
904+
return float('inf')
905+
906+
elif self._can_connect(node_id):
907+
log.debug("Initializing connection to node %s for metadata request", node_id)
908+
self._connecting.add(node_id)
909+
if not self._init_connect(node_id):
910+
if node_id in self._connecting:
911+
self._connecting.remove(node_id)
912+
# Connection attempt failed immediately, need to retry with a different node
913+
return self.config['reconnect_backoff_ms']
914+
else:
915+
# Existing connection with max in flight requests. Wait for request to complete.
916+
return self.config['request_timeout_ms']
917+
918+
# Recheck node_id in case we were able to connect immediately above
896919
if self._can_send_request(node_id):
897920
topics = list(self._topics)
898921
if not topics and self.cluster.is_bootstrap(node_id):
@@ -917,20 +940,11 @@ def refresh_done(val_or_error):
917940
future.add_errback(refresh_done)
918941
return self.config['request_timeout_ms']
919942

920-
# If there's any connection establishment underway, wait until it completes. This prevents
921-
# the client from unnecessarily connecting to additional nodes while a previous connection
922-
# attempt has not been completed.
943+
# Should only get here if still connecting
923944
if self._connecting:
924945
return float('inf')
925-
926-
if self.maybe_connect(node_id, wakeup=wakeup):
927-
log.debug("Initializing connection to node %s for metadata request", node_id)
928-
return float('inf')
929-
930-
# connected but can't send more, OR connecting
931-
# In either case we just need to wait for a network event
932-
# to let us know the selected connection might be usable again.
933-
return float('inf')
946+
else:
947+
return self.config['reconnect_backoff_ms']
934948

935949
def get_api_versions(self):
936950
"""Return the ApiVersions map, if available.
@@ -973,7 +987,7 @@ def check_version(self, node_id=None, timeout=None, strict=False):
973987
if try_node is None:
974988
self._lock.release()
975989
raise Errors.NoBrokersAvailable()
976-
if not self._maybe_connect(try_node):
990+
if not self._init_connect(try_node):
977991
if try_node == node_id:
978992
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
979993
else:

test/test_client_async.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def test_can_connect(cli, conn):
5858
assert cli._can_connect(0)
5959

6060
# Node is connected, can't reconnect
61-
assert cli._maybe_connect(0) is True
61+
assert cli._init_connect(0) is True
6262
assert not cli._can_connect(0)
6363

6464
# Node is disconnected, can connect
@@ -70,15 +70,15 @@ def test_can_connect(cli, conn):
7070
assert not cli._can_connect(0)
7171

7272

73-
def test_maybe_connect(cli, conn):
73+
def test_init_connect(cli, conn):
7474
# Node not in metadata, return False
75-
assert not cli._maybe_connect(2)
75+
assert not cli._init_connect(2)
7676

7777
# New node_id creates a conn object
7878
assert 0 not in cli._conns
7979
conn.state = ConnectionStates.DISCONNECTED
8080
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
81-
assert cli._maybe_connect(0) is True
81+
assert cli._init_connect(0) is True
8282
assert cli._conns[0] is conn
8383

8484

@@ -122,8 +122,8 @@ def test_ready(mocker, cli, conn):
122122

123123

124124
def test_is_ready(mocker, cli, conn):
125-
cli._maybe_connect(0)
126-
cli._maybe_connect(1)
125+
cli._init_connect(0)
126+
cli._init_connect(1)
127127

128128
# metadata refresh blocks ready nodes
129129
assert cli.is_ready(0)
@@ -166,14 +166,14 @@ def test_close(mocker, cli, conn):
166166
assert conn.close.call_count == call_count
167167

168168
# Single node close
169-
cli._maybe_connect(0)
169+
cli._init_connect(0)
170170
assert conn.close.call_count == call_count
171171
cli.close(0)
172172
call_count += 1
173173
assert conn.close.call_count == call_count
174174

175175
# All node close
176-
cli._maybe_connect(1)
176+
cli._init_connect(1)
177177
cli.close()
178178
# +2 close: node 1, node bootstrap (node 0 already closed)
179179
call_count += 2
@@ -185,7 +185,7 @@ def test_is_disconnected(cli, conn):
185185
conn.state = ConnectionStates.DISCONNECTED
186186
assert not cli.is_disconnected(0)
187187

188-
cli._maybe_connect(0)
188+
cli._init_connect(0)
189189
assert cli.is_disconnected(0)
190190

191191
conn.state = ConnectionStates.CONNECTING
@@ -210,7 +210,7 @@ def test_send(cli, conn):
210210
assert isinstance(f.exception, Errors.NodeNotReadyError)
211211

212212
conn.state = ConnectionStates.CONNECTED
213-
cli._maybe_connect(0)
213+
cli._init_connect(0)
214214
# ProduceRequest w/ 0 required_acks -> no response
215215
request = ProduceRequest[0](0, 0, [])
216216
assert request.expect_response() is False
@@ -339,8 +339,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
339339
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
340340
mocker.patch.object(client, '_can_send_request', return_value=False)
341341
mocker.patch.object(client, '_can_connect', return_value=True)
342-
mocker.patch.object(client, '_maybe_connect', return_value=True)
343-
mocker.patch.object(client, 'maybe_connect', return_value=True)
342+
mocker.patch.object(client, '_init_connect', return_value=True)
344343

345344
now = time.time()
346345
t = mocker.patch('time.time')
@@ -349,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
349348
# first poll attempts connection
350349
client.poll(timeout_ms=12345678)
351350
client._poll.assert_called_with(12345.678)
352-
client.maybe_connect.assert_called_once_with('foobar', wakeup=False)
351+
client._init_connect.assert_called_once_with('foobar')
353352

354353
# poll while connecting should not attempt a new connection
355354
client._connecting.add('foobar')

0 commit comments

Comments
 (0)