Skip to content

Add codec validators to record parser and builder for all formats. #1447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ test27: build-integration
# Test using py.test directly if you want to use local python. Useful for other
# platforms that require manual installation for C libraries, ie. Windows.
test-local: build-integration
py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF kafka test
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF $(FLAGS) kafka test

cov-local: build-integration
py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
--cov-config=.covrc --cov-report html kafka test
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
--cov-config=.covrc --cov-report html $(FLAGS) kafka test
@echo "open file://`pwd`/htmlcov/index.html"

# Check the readme for syntax errors, which can lead to invalid formatting on
Expand Down
22 changes: 18 additions & 4 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@
# * Timestamp Type (3)
# * Compression Type (0-2)

import io
import struct
import time
from kafka.record.abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder
from kafka.record.util import decode_varint, encode_varint, calc_crc32c, size_of_varint

from kafka.errors import CorruptRecordException
from kafka.record.util import (
decode_varint, encode_varint, calc_crc32c, size_of_varint
)
from kafka.errors import CorruptRecordException, UnsupportedCodecError
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode
)
import kafka.codec as codecs


class DefaultRecordBase(object):
Expand Down Expand Up @@ -101,6 +102,17 @@ class DefaultRecordBase(object):
LOG_APPEND_TIME = 1
CREATE_TIME = 0

def _assert_has_codec(self, compression_type):
if compression_type == self.CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == self.CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
if not checker():
raise UnsupportedCodecError(
"Libraries for {} compression codec not found".format(name))


class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):

Expand Down Expand Up @@ -156,6 +168,7 @@ def _maybe_uncompress(self):
if not self._decompressed:
compression_type = self.compression_type
if compression_type != self.CODEC_NONE:
self._assert_has_codec(compression_type)
data = memoryview(self._buffer)[self._pos:]
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
Expand Down Expand Up @@ -481,6 +494,7 @@ def write_header(self, use_compression_type=True):

def _maybe_compress(self):
if self._compression_type != self.CODEC_NONE:
self._assert_has_codec(self._compression_type)
header_size = self.HEADER_STRUCT.size
data = bytes(self._buffer[header_size:])
if self._compression_type == self.CODEC_GZIP:
Expand Down
18 changes: 16 additions & 2 deletions kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@

from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka,
)
from kafka.errors import CorruptRecordException
import kafka.codec as codecs
from kafka.errors import CorruptRecordException, UnsupportedCodecError


class LegacyRecordBase(object):
Expand Down Expand Up @@ -112,6 +113,17 @@ class LegacyRecordBase(object):

NO_TIMESTAMP = -1

def _assert_has_codec(self, compression_type):
if compression_type == self.CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == self.CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
if not checker():
raise UnsupportedCodecError(
"Libraries for {} compression codec not found".format(name))


class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):

Expand Down Expand Up @@ -166,6 +178,7 @@ def _decompress(self, key_offset):
data = self._buffer[pos:pos + value_size]

compression_type = self.compression_type
self._assert_has_codec(compression_type)
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
elif compression_type == self.CODEC_SNAPPY:
Expand Down Expand Up @@ -419,6 +432,7 @@ def _encode_msg(self, start_pos, offset, timestamp, key, value,

def _maybe_compress(self):
if self._compression_type:
self._assert_has_codec(self._compression_type)
data = bytes(self._buffer)
if self._compression_type == self.CODEC_GZIP:
compressed = gzip_encode(data)
Expand Down
2 changes: 1 addition & 1 deletion kafka/record/memory_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def append(self, timestamp, key, value, headers=[]):
""" Append a message to the buffer.

Returns:
(int, int): checksum and bytes written
RecordMetadata: object containing checksum, bytes written, etc.
"""
if self._closed:
return None, 0
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
flake8==3.4.1
pytest==3.4.0
pytest-cov==2.5.1
pytest-catchlog==1.2.2
docker-py==1.10.6
coveralls==1.2.0
Sphinx==1.6.4
Expand All @@ -13,3 +12,4 @@ pylint==1.8.2
pytest-pylint==0.7.1
pytest-mock==1.6.3
sphinx-rtd-theme==0.2.4
crc32c==1.2
2 changes: 1 addition & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import inspect

import pytest
from decorator import decorate

from test.fixtures import KafkaFixture, ZookeeperFixture
from test.testutil import kafka_version, random_string
Expand Down Expand Up @@ -73,6 +72,7 @@ def kafka_consumer_factory(kafka_broker, topic, request):
def factory(**kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
params.setdefault('auto_offset_reset', 'earliest')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want this? the default in the consumers is normally 'latest' and I'm a little concerned that this sets a default for test purposes that does not match normal behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gave me problems, at least. In tests, you set up by sending messages before consuming. So having a latest setting require you to connect before set up, which is not what you want in several places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood.

For my in-house tests of our internal wrapper, I did the connect-then-send messages and while slightly more verbose, it hasn't been too onerous.

That said, I'm fine with going this route instead.

_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
return _consumer[0]

Expand Down
37 changes: 36 additions & 1 deletion test/record/test_default_records.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import pytest
from mock import patch
import kafka.codec
from kafka.record.default_records import (
DefaultRecordBatch, DefaultRecordBatchBuilder
)
from kafka.errors import UnsupportedCodecError


@pytest.mark.parametrize("compression_type", [
Expand All @@ -17,7 +20,7 @@ def test_read_write_serde_v2(compression_type):
magic=2, compression_type=compression_type, is_transactional=1,
producer_id=123456, producer_epoch=123, base_sequence=9999,
batch_size=999999)
headers = [] # [("header1", b"aaa"), ("header2", b"bbb")]
headers = [("header1", b"aaa"), ("header2", b"bbb")]
for offset in range(10):
builder.append(
offset, timestamp=9999999, key=b"test", value=b"Super",
Expand Down Expand Up @@ -167,3 +170,35 @@ def test_default_batch_size_limit():
2, timestamp=None, key=None, value=b"M" * 700, headers=[])
assert meta is None
assert len(builder.build()) < 1000


@pytest.mark.parametrize("compression_type,name,checker_name", [
(DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
(DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
(DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
])
@pytest.mark.parametrize("magic", [0, 1])
def test_unavailable_codec(magic, compression_type, name, checker_name):
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
correct_buffer = builder.build()

with patch.object(kafka.codec, checker_name) as mocked:
mocked.return_value = False
# Check that builder raises error
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
error_msg = "Libraries for {} compression codec not found".format(name)
with pytest.raises(UnsupportedCodecError, match=error_msg):
builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
builder.build()

# Check that reader raises same error
batch = DefaultRecordBatch(bytes(correct_buffer))
with pytest.raises(UnsupportedCodecError, match=error_msg):
list(batch)
31 changes: 31 additions & 0 deletions test/record/test_legacy_records.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import unicode_literals
import pytest
from mock import patch
from kafka.record.legacy_records import (
LegacyRecordBatch, LegacyRecordBatchBuilder
)
import kafka.codec
from kafka.errors import UnsupportedCodecError


@pytest.mark.parametrize("magic", [0, 1])
Expand Down Expand Up @@ -164,3 +167,31 @@ def test_legacy_batch_size_limit(magic):
meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700)
assert meta is None
assert len(builder.build()) < 1000


@pytest.mark.parametrize("compression_type,name,checker_name", [
(LegacyRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
(LegacyRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
(LegacyRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
])
@pytest.mark.parametrize("magic", [0, 1])
def test_unavailable_codec(magic, compression_type, name, checker_name):
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=compression_type, batch_size=1024)
builder.append(0, timestamp=None, key=None, value=b"M")
correct_buffer = builder.build()

with patch.object(kafka.codec, checker_name) as mocked:
mocked.return_value = False
# Check that builder raises error
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=compression_type, batch_size=1024)
error_msg = "Libraries for {} compression codec not found".format(name)
with pytest.raises(UnsupportedCodecError, match=error_msg):
builder.append(0, timestamp=None, key=None, value=b"M")
builder.build()

# Check that reader raises same error
batch = LegacyRecordBatch(bytes(correct_buffer), magic)
with pytest.raises(UnsupportedCodecError, match=error_msg):
list(batch)
23 changes: 22 additions & 1 deletion test/test_consumer_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import os
import time
from mock import patch
import pytest
import kafka.codec

from six.moves import xrange
import six
Expand All @@ -13,7 +16,7 @@
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
KafkaTimeoutError
KafkaTimeoutError, UnsupportedCodecError
)
from kafka.structs import (
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
Expand All @@ -25,6 +28,7 @@
send_messages
)


def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
"""Test KafkaConsumer
"""
Expand All @@ -47,6 +51,23 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
kafka_consumer.close()


def test_kafka_consumer_unsupported_encoding(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add a skip if no version is set -- see #1453

topic, kafka_producer_factory, kafka_consumer_factory):
# Send a compressed message
producer = kafka_producer_factory(compression_type="gzip")
fut = producer.send(topic, b"simple message" * 200)
fut.get(timeout=5)
producer.close()

# Consume, but with the related compression codec not available
with patch.object(kafka.codec, "has_gzip") as mocked:
mocked.return_value = False
consumer = kafka_consumer_factory(auto_offset_reset='earliest')
error_msg = "Libraries for gzip compression codec not found"
with pytest.raises(UnsupportedCodecError, match=error_msg):
consumer.poll(timeout_ms=2000)


class TestConsumerIntegration(KafkaIntegrationTestCase):
maxDiff = None

Expand Down
7 changes: 5 additions & 2 deletions test/testutil.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import absolute_import

import functools
import operator
import os
import socket
import time
import uuid

import decorator
import pytest
from . import unittest

Expand Down Expand Up @@ -45,6 +47,7 @@ def construct_lambda(s):
validators = map(construct_lambda, versions)

def real_kafka_versions(func):
@functools.wraps(func)
def wrapper(func, *args, **kwargs):
version = kafka_version()

Expand All @@ -56,7 +59,7 @@ def wrapper(func, *args, **kwargs):
pytest.skip("unsupported kafka version")

return func(*args, **kwargs)
return decorator.decorator(wrapper, func)
return wrapper

return real_kafka_versions

Expand Down