Skip to content

Commit 3f53f43

Browse files
committed
check that state has not changed after conn._try_api_versions_check
1 parent 5060ac7 commit 3f53f43

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

kafka/conn.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -441,16 +441,18 @@ def connect(self):
441441

442442
if self.state is ConnectionStates.API_VERSIONS:
443443
if self._try_api_versions_check():
444-
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
445-
log.debug('%s: initiating SASL authentication', self)
446-
self.state = ConnectionStates.AUTHENTICATING
447-
self.config['state_change_callback'](self.node_id, self._sock, self)
448-
else:
449-
# security_protocol PLAINTEXT
450-
log.info('%s: Connection complete.', self)
451-
self.state = ConnectionStates.CONNECTED
452-
self._reset_reconnect_backoff()
453-
self.config['state_change_callback'](self.node_id, self._sock, self)
444+
# _try_api_versions_check has side-effects: possibly disconnected on socket errors
445+
if self.state is ConnectionStates.API_VERSIONS:
446+
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
447+
log.debug('%s: initiating SASL authentication', self)
448+
self.state = ConnectionStates.AUTHENTICATING
449+
self.config['state_change_callback'](self.node_id, self._sock, self)
450+
else:
451+
# security_protocol PLAINTEXT
452+
log.info('%s: Connection complete.', self)
453+
self.state = ConnectionStates.CONNECTED
454+
self._reset_reconnect_backoff()
455+
self.config['state_change_callback'](self.node_id, self._sock, self)
454456

455457
if self.state is ConnectionStates.AUTHENTICATING:
456458
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')

0 commit comments

Comments
 (0)