Skip to content

Commit 0331ddc

Browse files
wbarnhaax-ale
authored andcommitted
Add zstd support on legacy record and ensure no variable is referred before definition (dpkp#138)
* fix if statement logic and add zstd check * fix if statement logic and add zstd uncompress * fix imports * avoid variable be used before definition * Remove unused import from legacy_records.py --------- Co-authored-by: Alexandre Souza <[email protected]>
1 parent 03898de commit 0331ddc

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

kafka/record/default_records.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ def _assert_has_codec(self, compression_type: int) -> None:
115115
checker, name = codecs.has_lz4, "lz4"
116116
elif compression_type == self.CODEC_ZSTD:
117117
checker, name = codecs.has_zstd, "zstd"
118+
else:
119+
checker, name = lambda: False, "Unknown"
118120
if not checker():
119121
raise UnsupportedCodecError(
120122
f"Libraries for {name} compression codec not found")
@@ -525,6 +527,8 @@ def _maybe_compress(self) -> bool:
525527
compressed = lz4_encode(data)
526528
elif self._compression_type == self.CODEC_ZSTD:
527529
compressed = zstd_encode(data)
530+
else:
531+
compressed = '' # unknown
528532
compressed_size = len(compressed)
529533
if len(data) <= compressed_size:
530534
# We did not get any benefit from compression, lets send

kafka/record/legacy_records.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
from kafka.record.util import calc_crc32
5050

5151
from kafka.codec import (
52-
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
53-
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka,
52+
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, zstd_encode,
53+
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka, zstd_decode
5454
)
5555
import kafka.codec as codecs
5656
from kafka.errors import CorruptRecordException, UnsupportedCodecError
@@ -110,6 +110,7 @@ class LegacyRecordBase:
110110
CODEC_GZIP = 0x01
111111
CODEC_SNAPPY = 0x02
112112
CODEC_LZ4 = 0x03
113+
CODEC_ZSTD = 0x04
113114
TIMESTAMP_TYPE_MASK = 0x08
114115

115116
LOG_APPEND_TIME = 1
@@ -124,6 +125,10 @@ def _assert_has_codec(self, compression_type: int) -> None:
124125
checker, name = codecs.has_snappy, "snappy"
125126
elif compression_type == self.CODEC_LZ4:
126127
checker, name = codecs.has_lz4, "lz4"
128+
elif compression_type == self.CODEC_ZSTD:
129+
checker, name = codecs.has_zstd, "zstd"
130+
else:
131+
checker, name = lambda: False, "Unknown"
127132
if not checker():
128133
raise UnsupportedCodecError(
129134
f"Libraries for {name} compression codec not found")
@@ -195,6 +200,10 @@ def _decompress(self, key_offset: int) -> bytes:
195200
uncompressed = lz4_decode_old_kafka(data.tobytes())
196201
else:
197202
uncompressed = lz4_decode(data.tobytes())
203+
elif compression_type == self.CODEC_ZSTD:
204+
uncompressed = zstd_decode(data)
205+
else:
206+
raise ValueError("Unknown Compression Type - %s" % compression_type)
198207
return uncompressed
199208

200209
def _read_header(self, pos: int) -> Union[Tuple[int, int, int, int, int, None], Tuple[int, int, int, int, int, int]]:

0 commit comments

Comments
 (0)