Skip to content

Commit 9f1fa84

Browse files
committed
align api_version checks in code w/ broker_api_versions
1 parent 5f13168 commit 9f1fa84

File tree

8 files changed

+38
-33
lines changed

8 files changed

+38
-33
lines changed

kafka/client_async.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ def __init__(self, **configs):
242242
self.config['api_version'] = self.check_version(timeout=check_timeout)
243243
elif self.config['api_version'] in BROKER_API_VERSIONS:
244244
self._api_versions = BROKER_API_VERSIONS[self.config['api_version']]
245+
elif (self.config['api_version'] + (0,)) in BROKER_API_VERSIONS:
246+
log.warning('Configured api_version %s is ambiguous; using %s',
247+
self.config['api_version'], self.config['api_version'] + (0,))
248+
self.config['api_version'] = self.config['api_version'] + (0,)
249+
self._api_versions = BROKER_API_VERSIONS[self.config['api_version']]
245250
else:
246251
compatible_version = None
247252
for v in sorted(BROKER_API_VERSIONS.keys(), reverse=True):
@@ -864,8 +869,8 @@ def _maybe_refresh_metadata(self, wakeup=False):
864869
topics = list(self.config['bootstrap_topics_filter'])
865870

866871
if self.cluster.need_all_topic_metadata or not topics:
867-
topics = [] if self.config['api_version'] < (0, 10) else None
868-
api_version = 0 if self.config['api_version'] < (0, 10) else 1
872+
topics = [] if self.config['api_version'] < (0, 10, 0) else None
873+
api_version = 0 if self.config['api_version'] < (0, 10, 0) else 1
869874
request = MetadataRequest[api_version](topics)
870875
log.debug("Sending metadata request %s to node %s", request, node_id)
871876
future = self.send(node_id, request, wakeup=wakeup)
@@ -913,7 +918,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
913918
is down and the client enters a bootstrap backoff sleep.
914919
This is only possible if node_id is None.
915920
916-
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
921+
Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc
917922
918923
Raises:
919924
NodeNotReadyError (if node_id is provided)

kafka/conn.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ class BrokerConnection(object):
165165
or other configuration forbids use of all the specified ciphers),
166166
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
167167
api_version (tuple): Specify which Kafka API version to use.
168-
Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9),
169-
(0, 10). Default: (0, 8, 2)
168+
Must be None or >= (0, 10, 0) to enable SASL authentication.
169+
Default: None
170170
api_version_auto_timeout_ms (int): number of milliseconds to throw a
171171
timeout exception from the constructor when checking the broker
172172
api version. Only applies if api_version is None
@@ -214,7 +214,7 @@ class BrokerConnection(object):
214214
'ssl_crlfile': None,
215215
'ssl_password': None,
216216
'ssl_ciphers': None,
217-
'api_version': (0, 8, 2), # default to most restrictive
217+
'api_version': None,
218218
'selector': selectors.DefaultSelector,
219219
'state_change_callback': lambda node_id, sock, conn: True,
220220
'metrics': None,
@@ -522,7 +522,7 @@ def _try_handshake(self):
522522
return False
523523

524524
def _try_authenticate(self):
525-
assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10)
525+
assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10, 0)
526526

527527
if self._sasl_auth_future is None:
528528
# Build a SaslHandShakeRequest message
@@ -1178,16 +1178,16 @@ def _infer_broker_version_from_api_versions(self, api_versions):
11781178
test_cases = [
11791179
# format (<broker version>, <needed struct>)
11801180
# Make sure to update consumer_integration test check when adding newer versions.
1181-
((2, 6, 0), DescribeClientQuotasRequest[0]),
1182-
((2, 5, 0), DescribeAclsRequest_v2),
1183-
((2, 4, 0), ProduceRequest[8]),
1184-
((2, 3, 0), FetchRequest[11]),
1185-
((2, 2, 0), OffsetRequest[5]),
1186-
((2, 1, 0), FetchRequest[10]),
1187-
((2, 0, 0), FetchRequest[8]),
1188-
((1, 1, 0), FetchRequest[7]),
1189-
((1, 0, 0), MetadataRequest[5]),
1190-
((0, 11, 0), MetadataRequest[4]),
1181+
((2, 6), DescribeClientQuotasRequest[0]),
1182+
((2, 5), DescribeAclsRequest_v2),
1183+
((2, 4), ProduceRequest[8]),
1184+
((2, 3), FetchRequest[11]),
1185+
((2, 2), OffsetRequest[5]),
1186+
((2, 1), FetchRequest[10]),
1187+
((2, 0), FetchRequest[8]),
1188+
((1, 1), FetchRequest[7]),
1189+
((1, 0), MetadataRequest[5]),
1190+
((0, 11), MetadataRequest[4]),
11911191
((0, 10, 2), OffsetFetchRequest[2]),
11921192
((0, 10, 1), MetadataRequest[2]),
11931193
]
@@ -1209,7 +1209,7 @@ def check_version(self, timeout=2, strict=False, topics=[]):
12091209
12101210
Note: This is a blocking call.
12111211
1212-
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
1212+
Returns: version tuple, i.e. (3, 9), (2, 4), etc ...
12131213
"""
12141214
timeout_at = time.time() + timeout
12151215
log.info('Probing node %s broker version', self.node_id)
@@ -1240,7 +1240,7 @@ def reset_override_configs():
12401240
test_cases = [
12411241
# All cases starting from 0.10 will be based on ApiVersionsResponse
12421242
((0, 11), ApiVersionsRequest[1]()),
1243-
((0, 10), ApiVersionsRequest[0]()),
1243+
((0, 10, 0), ApiVersionsRequest[0]()),
12441244
((0, 9), ListGroupsRequest[0]()),
12451245
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
12461246
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
@@ -1273,7 +1273,7 @@ def reset_override_configs():
12731273
selector.close()
12741274

12751275
if f.succeeded():
1276-
if version >= (0, 10):
1276+
if version >= (0, 10, 0):
12771277
# Starting from 0.10 kafka broker we determine version
12781278
# by looking at ApiVersionsResponse
12791279
api_versions = self._handle_api_versions_response(f.value)
@@ -1303,7 +1303,7 @@ def reset_override_configs():
13031303
# requests (bug...). In this case we expect to see a correlation
13041304
# id mismatch
13051305
elif (isinstance(f.exception, Errors.CorrelationIdError) and
1306-
version == (0, 10)):
1306+
version > (0, 9)):
13071307
pass
13081308
elif six.PY2:
13091309
assert isinstance(f.exception.args[0], socket.error)

kafka/consumer/fetcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -702,11 +702,11 @@ def _create_fetch_requests(self):
702702
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
703703
partition, node_id)
704704

705-
if self.config['api_version'] >= (0, 11, 0):
705+
if self.config['api_version'] >= (0, 11):
706706
version = 4
707707
elif self.config['api_version'] >= (0, 10, 1):
708708
version = 3
709-
elif self.config['api_version'] >= (0, 10):
709+
elif self.config['api_version'] >= (0, 10, 0):
710710
version = 2
711711
elif self.config['api_version'] == (0, 9):
712712
version = 1

kafka/coordinator/base.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ def _send_join_group_request(self):
461461
self._generation.member_id,
462462
self.protocol_type(),
463463
member_metadata)
464-
elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
464+
elif (0, 10, 1) <= self.config['api_version'] < (0, 11):
465465
request = JoinGroupRequest[1](
466466
self.group_id,
467467
self.config['session_timeout_ms'],
@@ -562,7 +562,7 @@ def _handle_join_group_response(self, future, send_time, response):
562562

563563
def _on_join_follower(self):
564564
# send follower's sync group with an empty assignment
565-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
565+
version = 0 if self.config['api_version'] < (0, 11) else 1
566566
request = SyncGroupRequest[version](
567567
self.group_id,
568568
self._generation.generation_id,
@@ -590,7 +590,7 @@ def _on_join_leader(self, response):
590590
except Exception as e:
591591
return Future().failure(e)
592592

593-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
593+
version = 0 if self.config['api_version'] < (0, 11) else 1
594594
request = SyncGroupRequest[version](
595595
self.group_id,
596596
self._generation.generation_id,
@@ -771,7 +771,7 @@ def maybe_leave_group(self):
771771
# this is a minimal effort attempt to leave the group. we do not
772772
# attempt any resending if the request fails or times out.
773773
log.info('Leaving consumer group (%s).', self.group_id)
774-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
774+
version = 0 if self.config['api_version'] < (0, 11) else 1
775775
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
776776
future = self._client.send(self.coordinator_id, request)
777777
future.add_callback(self._handle_leave_group_response)
@@ -799,7 +799,7 @@ def _send_heartbeat_request(self):
799799
e = Errors.NodeNotReadyError(self.coordinator_id)
800800
return Future().failure(e)
801801

802-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
802+
version = 0 if self.config['api_version'] < (0, 11) else 1
803803
request = HeartbeatRequest[version](self.group_id,
804804
self._generation.generation_id,
805805
self._generation.member_id)

kafka/producer/kafka.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ def __init__(self, **configs):
393393
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
394394

395395
if self.config['compression_type'] == 'zstd':
396-
assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
396+
assert self.config['api_version'] >= (2, 1), 'Zstd Requires >= Kafka 2.1 Brokers'
397397

398398
# Check compression_type for library support
399399
ct = self.config['compression_type']
@@ -524,7 +524,7 @@ def partitions_for(self, topic):
524524
def _max_usable_produce_magic(self):
525525
if self.config['api_version'] >= (0, 11):
526526
return 2
527-
elif self.config['api_version'] >= (0, 10):
527+
elif self.config['api_version'] >= (0, 10, 0):
528528
return 1
529529
else:
530530
return 0

kafka/producer/sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ def _produce_request(self, node_id, acks, timeout, batches):
313313
elif self.config['api_version'] >= (0, 11):
314314
version = 3
315315
kwargs = dict(transactional_id=None)
316-
elif self.config['api_version'] >= (0, 10):
316+
elif self.config['api_version'] >= (0, 10, 0):
317317
version = 2
318318
elif self.config['api_version'] == (0, 9):
319319
version = 1

test/test_client_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ def test_least_loaded_node():
266266
def test_set_topics(mocker):
267267
request_update = mocker.patch.object(ClusterMetadata, 'request_update')
268268
request_update.side_effect = lambda: Future()
269-
cli = KafkaClient(api_version=(0, 10))
269+
cli = KafkaClient(api_version=(0, 10, 0))
270270

271271
# replace 'empty' with 'non empty'
272272
request_update.reset_mock()

test/test_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def test_request_timeout_larger_than_connections_max_idle_ms_raises(self):
1818
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000)
1919

2020
def test_subscription_copy(self):
21-
consumer = KafkaConsumer('foo', api_version=(0, 10))
21+
consumer = KafkaConsumer('foo', api_version=(0, 10, 0))
2222
sub = consumer.subscription()
2323
assert sub is not consumer.subscription()
2424
assert sub == set(['foo'])

0 commit comments

Comments
 (0)