Skip to content

Commit 39095a9

Browse files
committed
client _maybe_connect -> _init_connect
1 parent 6919bab commit 39095a9

File tree

2 files changed

+16
-18
lines changed

2 files changed

+16
-18
lines changed

kafka/client_async.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ def _should_recycle_connection(self, conn):
404404

405405
return False
406406

407-
def _maybe_connect(self, node_id):
407+
def _init_connect(self, node_id):
408408
"""Idempotent non-blocking connection attempt to the given node id.
409409
410410
Returns True if connection object exists and is connected / connecting
@@ -433,10 +433,8 @@ def _maybe_connect(self, node_id):
433433
**self.config)
434434
self._conns[node_id] = conn
435435

436-
elif conn.connected():
437-
return True
438-
439-
conn.connect()
436+
if conn.disconnected():
437+
conn.connect()
440438
return not conn.disconnected()
441439

442440
def ready(self, node_id, metadata_priority=True):
@@ -634,7 +632,7 @@ def poll(self, timeout_ms=None, future=None):
634632
for node_id in list(self._connecting):
635633
# False return means no more connection progress is possible
636634
# Connected nodes will update _connecting via state_change callback
637-
if not self._maybe_connect(node_id):
635+
if not self._init_connect(node_id):
638636
self._connecting.remove(node_id)
639637

640638
# If we got a future that is already done, don't block in _poll
@@ -984,7 +982,7 @@ def check_version(self, node_id=None, timeout=None, strict=False):
984982
time.sleep(sleep_time)
985983
continue
986984
log.debug('Attempting to check version with node %s', try_node)
987-
if not self._maybe_connect(try_node):
985+
if not self._init_connect(try_node):
988986
if try_node == node_id:
989987
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
990988
else:

test/test_client_async.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def test_can_connect(cli, conn):
5858
assert cli._can_connect(0)
5959

6060
# Node is connected, can't reconnect
61-
assert cli._maybe_connect(0) is True
61+
assert cli._init_connect(0) is True
6262
assert not cli._can_connect(0)
6363

6464
# Node is disconnected, can connect
@@ -70,15 +70,15 @@ def test_can_connect(cli, conn):
7070
assert not cli._can_connect(0)
7171

7272

73-
def test_maybe_connect(cli, conn):
73+
def test_init_connect(cli, conn):
7474
# Node not in metadata, return False
75-
assert not cli._maybe_connect(2)
75+
assert not cli._init_connect(2)
7676

7777
# New node_id creates a conn object
7878
assert 0 not in cli._conns
7979
conn.state = ConnectionStates.DISCONNECTED
8080
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
81-
assert cli._maybe_connect(0) is True
81+
assert cli._init_connect(0) is True
8282
assert cli._conns[0] is conn
8383

8484

@@ -122,8 +122,8 @@ def test_ready(mocker, cli, conn):
122122

123123

124124
def test_is_ready(mocker, cli, conn):
125-
cli._maybe_connect(0)
126-
cli._maybe_connect(1)
125+
cli._init_connect(0)
126+
cli._init_connect(1)
127127

128128
# metadata refresh blocks ready nodes
129129
assert cli.is_ready(0)
@@ -166,14 +166,14 @@ def test_close(mocker, cli, conn):
166166
assert conn.close.call_count == call_count
167167

168168
# Single node close
169-
cli._maybe_connect(0)
169+
cli._init_connect(0)
170170
assert conn.close.call_count == call_count
171171
cli.close(0)
172172
call_count += 1
173173
assert conn.close.call_count == call_count
174174

175175
# All node close
176-
cli._maybe_connect(1)
176+
cli._init_connect(1)
177177
cli.close()
178178
# +2 close: node 1, node bootstrap (node 0 already closed)
179179
call_count += 2
@@ -185,7 +185,7 @@ def test_is_disconnected(cli, conn):
185185
conn.state = ConnectionStates.DISCONNECTED
186186
assert not cli.is_disconnected(0)
187187

188-
cli._maybe_connect(0)
188+
cli._init_connect(0)
189189
assert cli.is_disconnected(0)
190190

191191
conn.state = ConnectionStates.CONNECTING
@@ -210,7 +210,7 @@ def test_send(cli, conn):
210210
assert isinstance(f.exception, Errors.NodeNotReadyError)
211211

212212
conn.state = ConnectionStates.CONNECTED
213-
cli._maybe_connect(0)
213+
cli._init_connect(0)
214214
# ProduceRequest w/ 0 required_acks -> no response
215215
request = ProduceRequest[0](0, 0, [])
216216
assert request.expect_response() is False
@@ -339,7 +339,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
339339
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
340340
mocker.patch.object(client, '_can_send_request', return_value=False)
341341
mocker.patch.object(client, '_can_connect', return_value=True)
342-
mocker.patch.object(client, '_maybe_connect', return_value=True)
342+
mocker.patch.object(client, '_init_connect', return_value=True)
343343
mocker.patch.object(client, 'maybe_connect', return_value=True)
344344

345345
now = time.time()

0 commit comments

Comments
 (0)