Skip to content

Commit 57f1782

Browse files
authored
Fix lint issues via ruff check (#2522)
1 parent 02dd98f commit 57f1782

21 files changed

+44
-47
lines changed

kafka/client_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from kafka.vendor import six
2020

2121
from kafka.cluster import ClusterMetadata
22-
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
22+
from kafka.conn import BrokerConnection, ConnectionStates, get_ip_port_afi
2323
from kafka import errors as Errors
2424
from kafka.future import Future
2525
from kafka.metrics import AnonMeasurable

kafka/conn.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from kafka.vendor import selectors34 as selectors
1515

1616
import socket
17-
import struct
1817
import threading
1918
import time
2019

@@ -23,7 +22,6 @@
2322
import kafka.errors as Errors
2423
from kafka.future import Future
2524
from kafka.metrics.stats import Avg, Count, Max, Rate
26-
from kafka.oauth.abstract import AbstractTokenProvider
2725
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
2826
from kafka.protocol.api_versions import ApiVersionsRequest
2927
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
@@ -36,7 +34,7 @@
3634
from kafka.protocol.produce import ProduceRequest
3735
from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest
3836
from kafka.protocol.sasl_handshake import SaslHandshakeRequest
39-
from kafka.protocol.types import Int32, Int8
37+
from kafka.protocol.types import Int32
4038
from kafka.sasl import get_sasl_mechanism
4139
from kafka.version import __version__
4240

@@ -1151,19 +1149,20 @@ def timed_out_ifrs(self):
11511149
def next_ifr_request_timeout_ms(self):
11521150
with self._lock:
11531151
if self.in_flight_requests:
1154-
get_timeout = lambda v: v[2]
1152+
def get_timeout(v):
1153+
return v[2]
11551154
next_timeout = min(map(get_timeout,
11561155
self.in_flight_requests.values()))
11571156
return max(0, (next_timeout - time.time()) * 1000)
11581157
else:
11591158
return float('inf')
11601159

11611160
def get_api_versions(self):
1162-
if self._api_versions is not None:
1163-
return self._api_versions
1164-
1165-
version = self.check_version()
1166-
# _api_versions is set as a side effect of check_versions()
1161+
# _api_versions is set as a side effect of first connection
1162+
# which should typically be bootstrap, but call check_version
1163+
# if that hasn't happened yet
1164+
if self._api_versions is None:
1165+
self.check_version()
11671166
return self._api_versions
11681167

11691168
def _infer_broker_version_from_api_versions(self, api_versions):
@@ -1201,11 +1200,11 @@ def _infer_broker_version_from_api_versions(self, api_versions):
12011200
]
12021201

12031202
# Get the best match of test cases
1204-
for broker_version, struct in sorted(test_cases, reverse=True):
1205-
if struct.API_KEY not in api_versions:
1203+
for broker_version, proto_struct in sorted(test_cases, reverse=True):
1204+
if proto_struct.API_KEY not in api_versions:
12061205
continue
1207-
min_version, max_version = api_versions[struct.API_KEY]
1208-
if min_version <= struct.API_VERSION <= max_version:
1206+
min_version, max_version = api_versions[proto_struct.API_KEY]
1207+
if min_version <= proto_struct.API_VERSION <= max_version:
12091208
return broker_version
12101209

12111210
# We know that ApiVersionsResponse is only supported in 0.10+

kafka/consumer/fetcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ def _unpack_records(self, tp, records):
501501
# If unpacking raises StopIteration, it is erroneously
502502
# caught by the generator. We want all exceptions to be raised
503503
# back to the user. See Issue 545
504-
except StopIteration as e:
504+
except StopIteration:
505505
log.exception('StopIteration raised unpacking messageset')
506506
raise RuntimeError('StopIteration raised unpacking messageset')
507507

@@ -1001,7 +1001,7 @@ def build_next(self, next_partitions):
10011001
log.debug("Built full fetch %s for node %s with %s partition(s).",
10021002
self.next_metadata, self.node_id, len(next_partitions))
10031003
self.session_partitions = next_partitions
1004-
return FetchRequestData(next_partitions, None, self.next_metadata);
1004+
return FetchRequestData(next_partitions, None, self.next_metadata)
10051005

10061006
prev_tps = set(self.session_partitions.keys())
10071007
next_tps = set(next_partitions.keys())

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from collections import defaultdict, namedtuple
33
from copy import deepcopy
44

5-
from kafka.cluster import ClusterMetadata
65
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
76
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
87
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet

kafka/coordinator/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ def _handle_offset_fetch_response(self, future, response):
796796
elif error_type is Errors.GroupAuthorizationFailedError:
797797
future.failure(error)
798798
else:
799-
log.error("Unknown error fetching offsets for %s: %s", tp, error)
799+
log.error("Unknown error fetching offsets: %s", error)
800800
future.failure(error)
801801
return
802802

kafka/metrics/metric_name.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def __eq__(self, other):
9393
return True
9494
if other is None:
9595
return False
96-
return (type(self) == type(other) and
96+
return (isinstance(self, type(other)) and
9797
self.group == other.group and
9898
self.name == other.name and
9999
self.tags == other.tags)

kafka/metrics/quota.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __hash__(self):
3434
def __eq__(self, other):
3535
if self is other:
3636
return True
37-
return (type(self) == type(other) and
37+
return (isinstance(self, type(other)) and
3838
self.bound == other.bound and
3939
self.is_upper_bound() == other.is_upper_bound())
4040

kafka/producer/kafka.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,8 +612,8 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
612612

613613
if headers is None:
614614
headers = []
615-
assert type(headers) == list
616-
assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)
615+
assert isinstance(headers, list)
616+
assert all(isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str) and isinstance(item[1], bytes) for item in headers)
617617

618618
message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
619619
self._ensure_valid_record_size(message_size)

kafka/protocol/api_versions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import absolute_import
22

3+
from io import BytesIO
4+
35
from kafka.protocol.api import Request, Response
46
from kafka.protocol.types import Array, Int16, Int32, Schema
57

kafka/protocol/commit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String
4+
from kafka.protocol.types import Array, Int16, Int32, Int64, Schema, String
55

66

77
class OffsetCommitResponse_v0(Response):

kafka/protocol/find_coordinator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String
4+
from kafka.protocol.types import Int8, Int16, Int32, Schema, String
55

66

77
class FindCoordinatorResponse_v0(Response):

kafka/protocol/offset_for_leader_epoch.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Int64, Schema, String, TaggedFields
55

66

7-
class OffsetForLeaderEpochResponse_v0(Request):
7+
class OffsetForLeaderEpochResponse_v0(Response):
88
API_KEY = 23
99
API_VERSION = 0
1010
SCHEMA = Schema(
@@ -16,7 +16,7 @@ class OffsetForLeaderEpochResponse_v0(Request):
1616
('end_offset', Int64))))))
1717

1818

19-
class OffsetForLeaderEpochResponse_v1(Request):
19+
class OffsetForLeaderEpochResponse_v1(Response):
2020
API_KEY = 23
2121
API_VERSION = 1
2222
SCHEMA = Schema(
@@ -29,7 +29,7 @@ class OffsetForLeaderEpochResponse_v1(Request):
2929
('end_offset', Int64))))))
3030

3131

32-
class OffsetForLeaderEpochResponse_v2(Request):
32+
class OffsetForLeaderEpochResponse_v2(Response):
3333
API_KEY = 23
3434
API_VERSION = 2
3535
SCHEMA = Schema(
@@ -43,13 +43,13 @@ class OffsetForLeaderEpochResponse_v2(Request):
4343
('end_offset', Int64))))))
4444

4545

46-
class OffsetForLeaderEpochResponse_v3(Request):
46+
class OffsetForLeaderEpochResponse_v3(Response):
4747
API_KEY = 23
4848
API_VERSION = 3
4949
SCHEMA = OffsetForLeaderEpochResponse_v2.SCHEMA
5050

5151

52-
class OffsetForLeaderEpochResponse_v4(Request):
52+
class OffsetForLeaderEpochResponse_v4(Response):
5353
API_KEY = 23
5454
API_VERSION = 4
5555
SCHEMA = Schema(

kafka/protocol/sasl_authenticate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Bytes, Int16, Int64, Schema, String
4+
from kafka.protocol.types import Bytes, Int16, Int64, Schema, String
55

66

77
class SaslAuthenticateResponse_v0(Response):

kafka/sasl/gssapi.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import absolute_import
22

3+
import struct
4+
35
# needed for SASL_GSSAPI authentication:
46
try:
57
import gssapi
@@ -66,7 +68,7 @@ def receive(self, auth_bytes):
6668
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
6769
# by the server
6870
message_parts = [
69-
Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))),
71+
struct.pack('>b', self.SASL_QOP_AUTH & struct.unpack('>b', msg[0:1])),
7072
msg[:1],
7173
self.auth_id.encode(),
7274
]

test/test_admin.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
def test_config_resource():
88
with pytest.raises(KeyError):
9-
bad_resource = kafka.admin.ConfigResource('something', 'foo')
9+
_bad_resource = kafka.admin.ConfigResource('something', 'foo')
1010
good_resource = kafka.admin.ConfigResource('broker', 'bar')
1111
assert good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER
1212
assert good_resource.name == 'bar'
@@ -59,11 +59,11 @@ def test_acl_resource():
5959

6060
def test_new_topic():
6161
with pytest.raises(IllegalArgumentError):
62-
bad_topic = kafka.admin.NewTopic('foo', -1, -1)
62+
_bad_topic = kafka.admin.NewTopic('foo', -1, -1)
6363
with pytest.raises(IllegalArgumentError):
64-
bad_topic = kafka.admin.NewTopic('foo', 1, -1)
64+
_bad_topic = kafka.admin.NewTopic('foo', 1, -1)
6565
with pytest.raises(IllegalArgumentError):
66-
bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]})
66+
_bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]})
6767
good_topic = kafka.admin.NewTopic('foo', 1, 2)
6868
assert good_topic.name == 'foo'
6969
assert good_topic.num_partitions == 1

test/test_admin_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,15 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
140140
broker_id = "str"
141141

142142
with pytest.raises(ValueError):
143-
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
143+
kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
144144

145145

146146
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
147147
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
148148
"""Tests that the describe consumer group call fails if the group coordinator is not available
149149
"""
150150
with pytest.raises(GroupCoordinatorNotAvailableError):
151-
group_description = kafka_admin_client.describe_consumer_groups(['test'])
151+
kafka_admin_client.describe_consumer_groups(['test'])
152152

153153

154154
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')

test/test_cluster.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
# pylint: skip-file
22
from __future__ import absolute_import
33

4-
import pytest
5-
64
from kafka.cluster import ClusterMetadata
75
from kafka.protocol.metadata import MetadataResponse
86

test/test_fetcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from kafka.client_async import KafkaClient
1212
from kafka.consumer.fetcher import (
13-
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
13+
CompletedFetch, ConsumerRecord, Fetcher
1414
)
1515
from kafka.consumer.subscription_state import SubscriptionState
1616
import kafka.errors as Errors

test/test_object_conversion.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def test_with_metadata_response():
207207
assert len(obj['topics']) == 2
208208
assert obj['topics'][0]['error_code'] == 0
209209
assert obj['topics'][0]['topic'] == 'testtopic1'
210-
assert obj['topics'][0]['is_internal'] == False
210+
assert obj['topics'][0]['is_internal'] is False
211211
assert len(obj['topics'][0]['partitions']) == 2
212212
assert obj['topics'][0]['partitions'][0]['error_code'] == 0
213213
assert obj['topics'][0]['partitions'][0]['partition'] == 0
@@ -224,7 +224,7 @@ def test_with_metadata_response():
224224

225225
assert obj['topics'][1]['error_code'] == 0
226226
assert obj['topics'][1]['topic'] == 'other-test-topic'
227-
assert obj['topics'][1]['is_internal'] == True
227+
assert obj['topics'][1]['is_internal'] is True
228228
assert len(obj['topics'][1]['partitions']) == 1
229229
assert obj['topics'][1]['partitions'][0]['error_code'] == 0
230230
assert obj['topics'][1]['partitions'][0]['partition'] == 0

test/test_protocol.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
import io
33
import struct
44

5-
import pytest
6-
75
from kafka.protocol.api import RequestHeader
86
from kafka.protocol.fetch import FetchRequest, FetchResponse
97
from kafka.protocol.find_coordinator import FindCoordinatorRequest
@@ -273,7 +271,7 @@ def test_decode_fetch_response_partial():
273271

274272
def test_struct_unrecognized_kwargs():
275273
try:
276-
mr = MetadataRequest[0](topicz='foo')
274+
_mr = MetadataRequest[0](topicz='foo')
277275
assert False, 'Structs should not allow unrecognized kwargs'
278276
except ValueError:
279277
pass
@@ -331,6 +329,6 @@ def test_compact_data_structs():
331329
assert CompactBytes.decode(io.BytesIO(b'\x00')) is None
332330
enc = CompactBytes.encode(b'')
333331
assert enc == b'\x01'
334-
assert CompactBytes.decode(io.BytesIO(b'\x01')) is b''
332+
assert CompactBytes.decode(io.BytesIO(b'\x01')) == b''
335333
enc = CompactBytes.encode(b'foo')
336334
assert CompactBytes.decode(io.BytesIO(enc)) == b'foo'

test/test_sender.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import io
66

77
from kafka.client_async import KafkaClient
8-
from kafka.cluster import ClusterMetadata
98
from kafka.metrics import Metrics
109
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
1110
from kafka.protocol.produce import ProduceRequest

0 commit comments

Comments
 (0)