Skip to content

Commit 5e461a7

Browse files
authored
Patch pylint warnings so tests pass again (dpkp#184)
* stop pylint complaint for uncovered conditional flow * add todo to revisit * formatting makes me happy :) * Fix errors raised by new version of Pylint so tests pass again
1 parent 448017e commit 5e461a7

File tree

4 files changed

+17
-3
lines changed

4 files changed

+17
-3
lines changed

kafka/admin/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,8 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
503503
topics=topics,
504504
allow_auto_topic_creation=auto_topic_creation
505505
)
506+
else:
507+
raise IncompatibleBrokerVersion(f"MetadataRequest for {version} is not supported")
506508

507509
future = self._send_request_to_node(
508510
self._client.least_loaded_node(),
@@ -1010,6 +1012,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id,
10101012
def _describe_consumer_groups_process_response(self, response):
10111013
"""Process a DescribeGroupsResponse into a group description."""
10121014
if response.API_VERSION <= 3:
1015+
group_description = None
10131016
assert len(response.groups) == 1
10141017
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
10151018
if isinstance(response_field, Array):
@@ -1045,6 +1048,8 @@ def _describe_consumer_groups_process_response(self, response):
10451048
if response.API_VERSION <=2:
10461049
described_group_information_list.append(None)
10471050
group_description = GroupInformation._make(described_group_information_list)
1051+
if group_description is None:
1052+
raise Errors.BrokerResponseError("No group description received")
10481053
error_code = group_description.error_code
10491054
error_type = Errors.for_code(error_code)
10501055
# Java has the note: KAFKA-6789, we can retry based on the error code

kafka/coordinator/consumer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,10 +628,15 @@ def _send_offset_commit_request(self, offsets):
628628
) for partition, offset in partitions.items()]
629629
) for topic, partitions in offset_data.items()]
630630
)
631+
else:
632+
# TODO: We really shouldn't need this here to begin with, but I'd like to get
633+
# pylint to stop complaining.
634+
raise Exception(f"Unsupported Broker API: {self.config['api_version']}")
631635

632636
log.debug("Sending offset-commit request with %s for group %s to %s",
633637
offsets, self.group_id, node_id)
634638

639+
635640
future = Future()
636641
_f = self._client.send(node_id, request)
637642
_f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())

kafka/record/default_records.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,14 @@ def _maybe_uncompress(self) -> None:
187187
data = memoryview(self._buffer)[self._pos:]
188188
if compression_type == self.CODEC_GZIP:
189189
uncompressed = gzip_decode(data)
190-
if compression_type == self.CODEC_SNAPPY:
190+
elif compression_type == self.CODEC_SNAPPY:
191191
uncompressed = snappy_decode(data.tobytes())
192-
if compression_type == self.CODEC_LZ4:
192+
elif compression_type == self.CODEC_LZ4:
193193
uncompressed = lz4_decode(data.tobytes())
194-
if compression_type == self.CODEC_ZSTD:
194+
elif compression_type == self.CODEC_ZSTD:
195195
uncompressed = zstd_decode(data.tobytes())
196+
else:
197+
raise NotImplementedError(f"Compression type {compression_type} is not supported")
196198
self._buffer = bytearray(uncompressed)
197199
self._pos = 0
198200
self._decompressed = True

kafka/record/legacy_records.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,8 @@ def _maybe_compress(self) -> bool:
461461
compressed = lz4_encode_old_kafka(data)
462462
else:
463463
compressed = lz4_encode(data)
464+
else:
465+
raise NotImplementedError(f"Compression type {self._compression_type} is not supported")
464466
size = self.size_in_bytes(
465467
0, timestamp=0, key=None, value=compressed)
466468
# We will try to reuse the same buffer if we have enough space

0 commit comments

Comments
 (0)