Skip to content

Commit ccd44ce

Browse files
authored
Only refresh metadata if connection fails all dns records (#2532)
1 parent 5360d79 commit ccd44ce

File tree

2 files changed

+5
-2
lines changed

2 files changed

+5
-2
lines changed

kafka/client_async.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ def __init__(self, **configs):
238238
self._api_versions = None
239239
self._connecting = set()
240240
self._sending = set()
241-
self._refresh_on_disconnects = True
242241

243242
# Not currently used, but data is collected internally
244243
self._last_bootstrap = 0
@@ -384,7 +383,7 @@ def _conn_state_change(self, node_id, sock, conn):
384383
elif self.cluster.is_bootstrap(node_id):
385384
self._bootstrap_fails += 1
386385

387-
elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
386+
elif conn.connect_failed() and not self._closed and not idle_disconnect:
388387
log.warning("Node %s connection failed -- refreshing metadata", node_id)
389388
self.cluster.request_update()
390389

kafka/conn.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,10 @@ def disconnected(self):
861861
"""Return True iff socket is closed"""
862862
return self.state is ConnectionStates.DISCONNECTED
863863

864+
def connect_failed(self):
865+
"""Return True iff connection attempt failed after attempting all dns records"""
866+
return self.disconnected() and self.last_attempt >= 0 and len(self._gai) == 0
867+
864868
def _reset_reconnect_backoff(self):
865869
self._failures = 0
866870
self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0

0 commit comments

Comments
 (0)