Skip to content

Commit 908ac8f

Browse files
tvoinarovskyidpkp
authored andcommitted
Add codec validators to record parser and builder for all formats (#1447)
1 parent d9e41c8 commit 908ac8f

File tree

9 files changed

+136
-15
lines changed

9 files changed

+136
-15
lines changed

Makefile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ test27: build-integration
2323
# Test using py.test directly if you want to use local python. Useful for other
2424
# platforms that require manual installation for C libraries, ie. Windows.
2525
test-local: build-integration
26-
py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF kafka test
26+
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \
27+
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF $(FLAGS) kafka test
2728

2829
cov-local: build-integration
29-
py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
30-
--cov-config=.covrc --cov-report html kafka test
30+
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \
31+
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
32+
--cov-config=.covrc --cov-report html $(FLAGS) kafka test
3133
@echo "open file://`pwd`/htmlcov/index.html"
3234

3335
# Check the readme for syntax errors, which can lead to invalid formatting on

kafka/record/default_records.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,18 @@
5454
# * Timestamp Type (3)
5555
# * Compression Type (0-2)
5656

57-
import io
5857
import struct
5958
import time
6059
from kafka.record.abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder
61-
from kafka.record.util import decode_varint, encode_varint, calc_crc32c, size_of_varint
62-
63-
from kafka.errors import CorruptRecordException
60+
from kafka.record.util import (
61+
decode_varint, encode_varint, calc_crc32c, size_of_varint
62+
)
63+
from kafka.errors import CorruptRecordException, UnsupportedCodecError
6464
from kafka.codec import (
6565
gzip_encode, snappy_encode, lz4_encode,
6666
gzip_decode, snappy_decode, lz4_decode
6767
)
68+
import kafka.codec as codecs
6869

6970

7071
class DefaultRecordBase(object):
@@ -101,6 +102,17 @@ class DefaultRecordBase(object):
101102
LOG_APPEND_TIME = 1
102103
CREATE_TIME = 0
103104

105+
def _assert_has_codec(self, compression_type):
106+
if compression_type == self.CODEC_GZIP:
107+
checker, name = codecs.has_gzip, "gzip"
108+
elif compression_type == self.CODEC_SNAPPY:
109+
checker, name = codecs.has_snappy, "snappy"
110+
elif compression_type == self.CODEC_LZ4:
111+
checker, name = codecs.has_lz4, "lz4"
112+
if not checker():
113+
raise UnsupportedCodecError(
114+
"Libraries for {} compression codec not found".format(name))
115+
104116

105117
class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
106118

@@ -156,6 +168,7 @@ def _maybe_uncompress(self):
156168
if not self._decompressed:
157169
compression_type = self.compression_type
158170
if compression_type != self.CODEC_NONE:
171+
self._assert_has_codec(compression_type)
159172
data = memoryview(self._buffer)[self._pos:]
160173
if compression_type == self.CODEC_GZIP:
161174
uncompressed = gzip_decode(data)
@@ -481,6 +494,7 @@ def write_header(self, use_compression_type=True):
481494

482495
def _maybe_compress(self):
483496
if self._compression_type != self.CODEC_NONE:
497+
self._assert_has_codec(self._compression_type)
484498
header_size = self.HEADER_STRUCT.size
485499
data = bytes(self._buffer[header_size:])
486500
if self._compression_type == self.CODEC_GZIP:

kafka/record/legacy_records.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@
4949

5050
from kafka.codec import (
5151
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
52-
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
52+
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka,
5353
)
54-
from kafka.errors import CorruptRecordException
54+
import kafka.codec as codecs
55+
from kafka.errors import CorruptRecordException, UnsupportedCodecError
5556

5657

5758
class LegacyRecordBase(object):
@@ -112,6 +113,17 @@ class LegacyRecordBase(object):
112113

113114
NO_TIMESTAMP = -1
114115

116+
def _assert_has_codec(self, compression_type):
117+
if compression_type == self.CODEC_GZIP:
118+
checker, name = codecs.has_gzip, "gzip"
119+
elif compression_type == self.CODEC_SNAPPY:
120+
checker, name = codecs.has_snappy, "snappy"
121+
elif compression_type == self.CODEC_LZ4:
122+
checker, name = codecs.has_lz4, "lz4"
123+
if not checker():
124+
raise UnsupportedCodecError(
125+
"Libraries for {} compression codec not found".format(name))
126+
115127

116128
class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
117129

@@ -166,6 +178,7 @@ def _decompress(self, key_offset):
166178
data = self._buffer[pos:pos + value_size]
167179

168180
compression_type = self.compression_type
181+
self._assert_has_codec(compression_type)
169182
if compression_type == self.CODEC_GZIP:
170183
uncompressed = gzip_decode(data)
171184
elif compression_type == self.CODEC_SNAPPY:
@@ -419,6 +432,7 @@ def _encode_msg(self, start_pos, offset, timestamp, key, value,
419432

420433
def _maybe_compress(self):
421434
if self._compression_type:
435+
self._assert_has_codec(self._compression_type)
422436
data = bytes(self._buffer)
423437
if self._compression_type == self.CODEC_GZIP:
424438
compressed = gzip_encode(data)

requirements-dev.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
flake8==3.4.1
22
pytest==3.4.0
33
pytest-cov==2.5.1
4-
pytest-catchlog==1.2.2
54
docker-py==1.10.6
65
coveralls==1.2.0
76
Sphinx==1.6.4
@@ -13,3 +12,4 @@ pylint==1.8.2
1312
pytest-pylint==0.7.1
1413
pytest-mock==1.6.3
1514
sphinx-rtd-theme==0.2.4
15+
crc32c==1.2

test/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import inspect
44

55
import pytest
6-
from decorator import decorate
76

87
from test.fixtures import KafkaFixture, ZookeeperFixture
98
from test.testutil import kafka_version, random_string
@@ -73,6 +72,7 @@ def kafka_consumer_factory(kafka_broker, topic, request):
7372
def factory(**kafka_consumer_params):
7473
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
7574
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
75+
params.setdefault('auto_offset_reset', 'earliest')
7676
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
7777
return _consumer[0]
7878

test/record/test_default_records.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import unicode_literals
33
import pytest
4+
from mock import patch
5+
import kafka.codec
46
from kafka.record.default_records import (
57
DefaultRecordBatch, DefaultRecordBatchBuilder
68
)
9+
from kafka.errors import UnsupportedCodecError
710

811

912
@pytest.mark.parametrize("compression_type", [
@@ -17,7 +20,7 @@ def test_read_write_serde_v2(compression_type):
1720
magic=2, compression_type=compression_type, is_transactional=1,
1821
producer_id=123456, producer_epoch=123, base_sequence=9999,
1922
batch_size=999999)
20-
headers = [] # [("header1", b"aaa"), ("header2", b"bbb")]
23+
headers = [("header1", b"aaa"), ("header2", b"bbb")]
2124
for offset in range(10):
2225
builder.append(
2326
offset, timestamp=9999999, key=b"test", value=b"Super",
@@ -167,3 +170,35 @@ def test_default_batch_size_limit():
167170
2, timestamp=None, key=None, value=b"M" * 700, headers=[])
168171
assert meta is None
169172
assert len(builder.build()) < 1000
173+
174+
175+
@pytest.mark.parametrize("compression_type,name,checker_name", [
176+
(DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
177+
(DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
178+
(DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
179+
])
180+
@pytest.mark.parametrize("magic", [0, 1])
181+
def test_unavailable_codec(magic, compression_type, name, checker_name):
182+
builder = DefaultRecordBatchBuilder(
183+
magic=2, compression_type=compression_type, is_transactional=0,
184+
producer_id=-1, producer_epoch=-1, base_sequence=-1,
185+
batch_size=1024)
186+
builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
187+
correct_buffer = builder.build()
188+
189+
with patch.object(kafka.codec, checker_name) as mocked:
190+
mocked.return_value = False
191+
# Check that builder raises error
192+
builder = DefaultRecordBatchBuilder(
193+
magic=2, compression_type=compression_type, is_transactional=0,
194+
producer_id=-1, producer_epoch=-1, base_sequence=-1,
195+
batch_size=1024)
196+
error_msg = "Libraries for {} compression codec not found".format(name)
197+
with pytest.raises(UnsupportedCodecError, match=error_msg):
198+
builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
199+
builder.build()
200+
201+
# Check that reader raises same error
202+
batch = DefaultRecordBatch(bytes(correct_buffer))
203+
with pytest.raises(UnsupportedCodecError, match=error_msg):
204+
list(batch)

test/record/test_legacy_records.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
from __future__ import unicode_literals
22
import pytest
3+
from mock import patch
34
from kafka.record.legacy_records import (
45
LegacyRecordBatch, LegacyRecordBatchBuilder
56
)
7+
import kafka.codec
8+
from kafka.errors import UnsupportedCodecError
69

710

811
@pytest.mark.parametrize("magic", [0, 1])
@@ -164,3 +167,31 @@ def test_legacy_batch_size_limit(magic):
164167
meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700)
165168
assert meta is None
166169
assert len(builder.build()) < 1000
170+
171+
172+
@pytest.mark.parametrize("compression_type,name,checker_name", [
173+
(LegacyRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
174+
(LegacyRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
175+
(LegacyRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
176+
])
177+
@pytest.mark.parametrize("magic", [0, 1])
178+
def test_unavailable_codec(magic, compression_type, name, checker_name):
179+
builder = LegacyRecordBatchBuilder(
180+
magic=magic, compression_type=compression_type, batch_size=1024)
181+
builder.append(0, timestamp=None, key=None, value=b"M")
182+
correct_buffer = builder.build()
183+
184+
with patch.object(kafka.codec, checker_name) as mocked:
185+
mocked.return_value = False
186+
# Check that builder raises error
187+
builder = LegacyRecordBatchBuilder(
188+
magic=magic, compression_type=compression_type, batch_size=1024)
189+
error_msg = "Libraries for {} compression codec not found".format(name)
190+
with pytest.raises(UnsupportedCodecError, match=error_msg):
191+
builder.append(0, timestamp=None, key=None, value=b"M")
192+
builder.build()
193+
194+
# Check that reader raises same error
195+
batch = LegacyRecordBatch(bytes(correct_buffer), magic)
196+
with pytest.raises(UnsupportedCodecError, match=error_msg):
197+
list(batch)

test/test_consumer_integration.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import logging
22
import os
33
import time
4+
from mock import patch
5+
import pytest
6+
import kafka.codec
47

58
import pytest
69
from six.moves import xrange
@@ -14,7 +17,7 @@
1417
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
1518
from kafka.errors import (
1619
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
17-
KafkaTimeoutError
20+
KafkaTimeoutError, UnsupportedCodecError
1821
)
1922
from kafka.structs import (
2023
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
@@ -27,6 +30,7 @@
2730
send_messages
2831
)
2932

33+
3034
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
3135
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
3236
"""Test KafkaConsumer
@@ -50,6 +54,24 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
5054
kafka_consumer.close()
5155

5256

57+
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
58+
def test_kafka_consumer_unsupported_encoding(
59+
topic, kafka_producer_factory, kafka_consumer_factory):
60+
# Send a compressed message
61+
producer = kafka_producer_factory(compression_type="gzip")
62+
fut = producer.send(topic, b"simple message" * 200)
63+
fut.get(timeout=5)
64+
producer.close()
65+
66+
# Consume, but with the related compression codec not available
67+
with patch.object(kafka.codec, "has_gzip") as mocked:
68+
mocked.return_value = False
69+
consumer = kafka_consumer_factory(auto_offset_reset='earliest')
70+
error_msg = "Libraries for gzip compression codec not found"
71+
with pytest.raises(UnsupportedCodecError, match=error_msg):
72+
consumer.poll(timeout_ms=2000)
73+
74+
5375
class TestConsumerIntegration(KafkaIntegrationTestCase):
5476
maxDiff = None
5577

test/testutil.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
from __future__ import absolute_import
2+
3+
import functools
14
import operator
25
import os
36
import socket
47
import time
58
import uuid
69

7-
import decorator
810
import pytest
911
from . import unittest
1012

@@ -45,6 +47,7 @@ def construct_lambda(s):
4547
validators = map(construct_lambda, versions)
4648

4749
def real_kafka_versions(func):
50+
@functools.wraps(func)
4851
def wrapper(func, *args, **kwargs):
4952
version = kafka_version()
5053

@@ -56,7 +59,7 @@ def wrapper(func, *args, **kwargs):
5659
pytest.skip("unsupported kafka version")
5760

5861
return func(*args, **kwargs)
59-
return decorator.decorator(wrapper, func)
62+
return wrapper
6063

6164
return real_kafka_versions
6265

0 commit comments

Comments
 (0)