Skip to content

Fix lint issues via ruff check #2522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from kafka.vendor import six

from kafka.cluster import ClusterMetadata
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from kafka.conn import BrokerConnection, ConnectionStates, get_ip_port_afi
from kafka import errors as Errors
from kafka.future import Future
from kafka.metrics import AnonMeasurable
Expand Down
25 changes: 12 additions & 13 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from kafka.vendor import selectors34 as selectors

import socket
import struct
import threading
import time

Expand All @@ -23,7 +22,6 @@
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.oauth.abstract import AbstractTokenProvider
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
from kafka.protocol.api_versions import ApiVersionsRequest
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
Expand All @@ -36,7 +34,7 @@
from kafka.protocol.produce import ProduceRequest
from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest
from kafka.protocol.sasl_handshake import SaslHandshakeRequest
from kafka.protocol.types import Int32, Int8
from kafka.protocol.types import Int32
from kafka.sasl import get_sasl_mechanism
from kafka.version import __version__

Expand Down Expand Up @@ -1151,19 +1149,20 @@ def timed_out_ifrs(self):
def next_ifr_request_timeout_ms(self):
with self._lock:
if self.in_flight_requests:
get_timeout = lambda v: v[2]
def get_timeout(v):
return v[2]
next_timeout = min(map(get_timeout,
self.in_flight_requests.values()))
return max(0, (next_timeout - time.time()) * 1000)
else:
return float('inf')

def get_api_versions(self):
if self._api_versions is not None:
return self._api_versions

version = self.check_version()
# _api_versions is set as a side effect of check_versions()
# _api_versions is set as a side effect of first connection
# which should typically be bootstrap, but call check_version
# if that hasn't happened yet
if self._api_versions is None:
self.check_version()
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
Expand Down Expand Up @@ -1201,11 +1200,11 @@ def _infer_broker_version_from_api_versions(self, api_versions):
]

# Get the best match of test cases
for broker_version, struct in sorted(test_cases, reverse=True):
if struct.API_KEY not in api_versions:
for broker_version, proto_struct in sorted(test_cases, reverse=True):
if proto_struct.API_KEY not in api_versions:
continue
min_version, max_version = api_versions[struct.API_KEY]
if min_version <= struct.API_VERSION <= max_version:
min_version, max_version = api_versions[proto_struct.API_KEY]
if min_version <= proto_struct.API_VERSION <= max_version:
return broker_version

# We know that ApiVersionsResponse is only supported in 0.10+
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def _unpack_records(self, tp, records):
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
# back to the user. See Issue 545
except StopIteration as e:
except StopIteration:
log.exception('StopIteration raised unpacking messageset')
raise RuntimeError('StopIteration raised unpacking messageset')

Expand Down Expand Up @@ -1001,7 +1001,7 @@ def build_next(self, next_partitions):
log.debug("Built full fetch %s for node %s with %s partition(s).",
self.next_metadata, self.node_id, len(next_partitions))
self.session_partitions = next_partitions
return FetchRequestData(next_partitions, None, self.next_metadata);
return FetchRequestData(next_partitions, None, self.next_metadata)

prev_tps = set(self.session_partitions.keys())
next_tps = set(next_partitions.keys())
Expand Down
1 change: 0 additions & 1 deletion kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.cluster import ClusterMetadata
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ def _handle_offset_fetch_response(self, future, response):
elif error_type is Errors.GroupAuthorizationFailedError:
future.failure(error)
else:
log.error("Unknown error fetching offsets for %s: %s", tp, error)
log.error("Unknown error fetching offsets: %s", error)
future.failure(error)
return

Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/metric_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __eq__(self, other):
return True
if other is None:
return False
return (type(self) == type(other) and
return (isinstance(self, type(other)) and
self.group == other.group and
self.name == other.name and
self.tags == other.tags)
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/quota.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __hash__(self):
def __eq__(self, other):
if self is other:
return True
return (type(self) == type(other) and
return (isinstance(self, type(other)) and
self.bound == other.bound and
self.is_upper_bound() == other.is_upper_bound())

Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,8 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest

if headers is None:
headers = []
assert type(headers) == list
assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)
assert isinstance(headers, list)
assert all(isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str) and isinstance(item[1], bytes) for item in headers)

message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
self._ensure_valid_record_size(message_size)
Expand Down
2 changes: 2 additions & 0 deletions kafka/protocol/api_versions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import

from io import BytesIO

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Int16, Int32, Schema

Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/commit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String
from kafka.protocol.types import Array, Int16, Int32, Int64, Schema, String


class OffsetCommitResponse_v0(Response):
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/find_coordinator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String
from kafka.protocol.types import Int8, Int16, Int32, Schema, String


class FindCoordinatorResponse_v0(Response):
Expand Down
10 changes: 5 additions & 5 deletions kafka/protocol/offset_for_leader_epoch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Int64, Schema, String, TaggedFields


class OffsetForLeaderEpochResponse_v0(Request):
class OffsetForLeaderEpochResponse_v0(Response):
API_KEY = 23
API_VERSION = 0
SCHEMA = Schema(
Expand All @@ -16,7 +16,7 @@ class OffsetForLeaderEpochResponse_v0(Request):
('end_offset', Int64))))))


class OffsetForLeaderEpochResponse_v1(Request):
class OffsetForLeaderEpochResponse_v1(Response):
API_KEY = 23
API_VERSION = 1
SCHEMA = Schema(
Expand All @@ -29,7 +29,7 @@ class OffsetForLeaderEpochResponse_v1(Request):
('end_offset', Int64))))))


class OffsetForLeaderEpochResponse_v2(Request):
class OffsetForLeaderEpochResponse_v2(Response):
API_KEY = 23
API_VERSION = 2
SCHEMA = Schema(
Expand All @@ -43,13 +43,13 @@ class OffsetForLeaderEpochResponse_v2(Request):
('end_offset', Int64))))))


class OffsetForLeaderEpochResponse_v3(Request):
class OffsetForLeaderEpochResponse_v3(Response):
API_KEY = 23
API_VERSION = 3
SCHEMA = OffsetForLeaderEpochResponse_v2.SCHEMA


class OffsetForLeaderEpochResponse_v4(Request):
class OffsetForLeaderEpochResponse_v4(Response):
API_KEY = 23
API_VERSION = 4
SCHEMA = Schema(
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/sasl_authenticate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Bytes, Int16, Int64, Schema, String
from kafka.protocol.types import Bytes, Int16, Int64, Schema, String


class SaslAuthenticateResponse_v0(Response):
Expand Down
4 changes: 3 additions & 1 deletion kafka/sasl/gssapi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import

import struct

# needed for SASL_GSSAPI authentication:
try:
import gssapi
Expand Down Expand Up @@ -66,7 +68,7 @@ def receive(self, auth_bytes):
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
# by the server
message_parts = [
Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))),
struct.pack('>b', self.SASL_QOP_AUTH & struct.unpack('>b', msg[0:1])),
msg[:1],
self.auth_id.encode(),
]
Expand Down
8 changes: 4 additions & 4 deletions test/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

def test_config_resource():
with pytest.raises(KeyError):
bad_resource = kafka.admin.ConfigResource('something', 'foo')
_bad_resource = kafka.admin.ConfigResource('something', 'foo')
good_resource = kafka.admin.ConfigResource('broker', 'bar')
assert good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER
assert good_resource.name == 'bar'
Expand Down Expand Up @@ -59,11 +59,11 @@ def test_acl_resource():

def test_new_topic():
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', -1, -1)
_bad_topic = kafka.admin.NewTopic('foo', -1, -1)
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', 1, -1)
_bad_topic = kafka.admin.NewTopic('foo', 1, -1)
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]})
_bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]})
good_topic = kafka.admin.NewTopic('foo', 1, 2)
assert good_topic.name == 'foo'
assert good_topic.num_partitions == 1
Expand Down
4 changes: 2 additions & 2 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
broker_id = "str"

with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
"""Tests that the describe consumer group call fails if the group coordinator is not available
"""
with pytest.raises(GroupCoordinatorNotAvailableError):
group_description = kafka_admin_client.describe_consumer_groups(['test'])
kafka_admin_client.describe_consumer_groups(['test'])


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
Expand Down
2 changes: 0 additions & 2 deletions test/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# pylint: skip-file
from __future__ import absolute_import

import pytest

from kafka.cluster import ClusterMetadata
from kafka.protocol.metadata import MetadataResponse

Expand Down
2 changes: 1 addition & 1 deletion test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from kafka.client_async import KafkaClient
from kafka.consumer.fetcher import (
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
CompletedFetch, ConsumerRecord, Fetcher
)
from kafka.consumer.subscription_state import SubscriptionState
import kafka.errors as Errors
Expand Down
4 changes: 2 additions & 2 deletions test/test_object_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def test_with_metadata_response():
assert len(obj['topics']) == 2
assert obj['topics'][0]['error_code'] == 0
assert obj['topics'][0]['topic'] == 'testtopic1'
assert obj['topics'][0]['is_internal'] == False
assert obj['topics'][0]['is_internal'] is False
assert len(obj['topics'][0]['partitions']) == 2
assert obj['topics'][0]['partitions'][0]['error_code'] == 0
assert obj['topics'][0]['partitions'][0]['partition'] == 0
Expand All @@ -224,7 +224,7 @@ def test_with_metadata_response():

assert obj['topics'][1]['error_code'] == 0
assert obj['topics'][1]['topic'] == 'other-test-topic'
assert obj['topics'][1]['is_internal'] == True
assert obj['topics'][1]['is_internal'] is True
assert len(obj['topics'][1]['partitions']) == 1
assert obj['topics'][1]['partitions'][0]['error_code'] == 0
assert obj['topics'][1]['partitions'][0]['partition'] == 0
Expand Down
6 changes: 2 additions & 4 deletions test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import io
import struct

import pytest

from kafka.protocol.api import RequestHeader
from kafka.protocol.fetch import FetchRequest, FetchResponse
from kafka.protocol.find_coordinator import FindCoordinatorRequest
Expand Down Expand Up @@ -273,7 +271,7 @@ def test_decode_fetch_response_partial():

def test_struct_unrecognized_kwargs():
try:
mr = MetadataRequest[0](topicz='foo')
_mr = MetadataRequest[0](topicz='foo')
assert False, 'Structs should not allow unrecognized kwargs'
except ValueError:
pass
Expand Down Expand Up @@ -331,6 +329,6 @@ def test_compact_data_structs():
assert CompactBytes.decode(io.BytesIO(b'\x00')) is None
enc = CompactBytes.encode(b'')
assert enc == b'\x01'
assert CompactBytes.decode(io.BytesIO(b'\x01')) is b''
assert CompactBytes.decode(io.BytesIO(b'\x01')) == b''
enc = CompactBytes.encode(b'foo')
assert CompactBytes.decode(io.BytesIO(enc)) == b'foo'
1 change: 0 additions & 1 deletion test/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io

from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
from kafka.metrics import Metrics
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
from kafka.protocol.produce import ProduceRequest
Expand Down