Skip to content

Adding codec abstraction for compatibility #179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from kafka.client import KafkaClient
from kafka.conn import KafkaConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
create_message, create_encoded_message
)
from kafka.producer import SimpleProducer, KeyedProducer
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
Expand All @@ -16,6 +16,5 @@
__all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
'MultiProcessConsumer', 'create_message', 'create_gzip_message',
'create_snappy_message'
'MultiProcessConsumer', 'create_message', 'create_encoded_message'
]
21 changes: 21 additions & 0 deletions kafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from cStringIO import StringIO
import gzip
import struct
from functools import partial

_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
Expand Down Expand Up @@ -138,3 +139,23 @@ def snappy_decode(payload):
return out.read()
else:
return snappy.decompress(payload)


class Codec(object):
def __init__(self, mask, encoder=lambda m: m, decoder=lambda m: m):
self.mask = mask
self.encoder = encoder
self.decoder = decoder

def encode(self, payload):
return self.encoder(payload)

def decode(self, payload):
return self.decoder(payload)

CODEC_NONE = Codec(0x00)
CODEC_GZIP = Codec(0x01, gzip_encode, gzip_decode)
CODEC_SNAPPY = Codec(0x02, snappy_encode, snappy_decode)
CODEC_SNAPPY_XERIAL = Codec(0x02, partial(snappy_encode, xerial_compatible=True), snappy_decode)

ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, CODEC_SNAPPY_XERIAL)
81 changes: 20 additions & 61 deletions kafka/protocol.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import struct
import zlib
from functools import partial

from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
ALL_CODECS, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, CODEC_SNAPPY_XERIAL
)
from kafka.common import (
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
Expand All @@ -20,11 +21,6 @@
log = logging.getLogger("kafka")

ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)


class KafkaProtocol(object):
"""
Expand Down Expand Up @@ -154,18 +150,17 @@ def _decode_message(cls, data, offset):

codec = att & ATTRIBUTE_CODEC_MASK

if codec == CODEC_NONE:
if codec == CODEC_NONE.mask:
yield (offset, Message(magic, att, key, value))

elif codec == CODEC_GZIP:
gz = gzip_decode(value)
for (offset, msg) in KafkaProtocol._decode_message_set_iter(gz):
yield (offset, msg)

elif codec == CODEC_SNAPPY:
snp = snappy_decode(value)
for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
yield (offset, msg)
else:
decoders = filter(lambda c: c.mask == codec, ALL_CODECS)
if decoders:
message_set = decoders[0].decode(value)
for (offset, msg) in KafkaProtocol._decode_message_set_iter(message_set):
yield (offset, msg)
else:
raise UnsupportedCodecError('FILL ME IN')


##################
# Public API #
Expand Down Expand Up @@ -528,47 +523,13 @@ def create_message(payload, key=None):
"""
return Message(0, 0, key, payload)


def create_gzip_message(payloads, key=None):
"""
Construct a Gzipped Message containing multiple Messages

The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.

Params
======
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads])

gzipped = gzip_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP

return Message(0, 0x00 | codec, key, gzipped)


def create_snappy_message(payloads, key=None):
"""
Construct a Snappy Message containing multiple Messages

The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.

Params
======
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
def create_encoded_message(messages, codec=CODEC_NONE):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lost the ability to specify a key here?

message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads])

snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY

return Message(0, 0x00 | codec, key, snapped)
[create_message(m) for m in messages])
encoded = codec.encode(message_set)
codec_flag = ATTRIBUTE_CODEC_MASK & codec.mask

return Message(0, 0x00 | codec_flag, None, encoded)


def create_message_set(messages, codec=CODEC_NONE):
Expand All @@ -579,9 +540,7 @@ def create_message_set(messages, codec=CODEC_NONE):
"""
if codec == CODEC_NONE:
return [create_message(m) for m in messages]
elif codec == CODEC_GZIP:
return [create_gzip_message(messages)]
elif codec == CODEC_SNAPPY:
return [create_snappy_message(messages)]
elif codec in ALL_CODECS:
return [create_encoded_message(messages, codec)]
else:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
2 changes: 1 addition & 1 deletion test/test_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
snappy_encode, snappy_decode
)
from kafka.protocol import (
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
create_message, create_encoded_message, KafkaProtocol
)
from testutil import *

Expand Down
1 change: 1 addition & 0 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime

from kafka import * # noqa
from kafka.protocol import CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY
from kafka.common import * # noqa
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from fixtures import ZookeeperFixture, KafkaFixture
Expand Down
41 changes: 28 additions & 13 deletions test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import kafka.protocol
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
create_message, create_gzip_message, create_snappy_message,
create_message_set
create_message, create_encoded_message, create_message_set
)

class TestProtocol(unittest2.TestCase):
Expand All @@ -39,9 +38,9 @@ def test_create_message(self):

def test_create_gzip(self):
payloads = ["v1", "v2"]
msg = create_gzip_message(payloads)
msg = create_encoded_message(payloads, CODEC_GZIP)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP.mask)
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
Expand All @@ -68,9 +67,9 @@ def test_create_gzip(self):
@unittest2.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy(self):
payloads = ["v1", "v2"]
msg = create_snappy_message(payloads)
msg = create_encoded_message(payloads, CODEC_SNAPPY)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY.mask)
self.assertEqual(msg.key, None)
decoded = snappy_decode(msg.value)
expect = "".join([
Expand Down Expand Up @@ -703,11 +702,27 @@ def test_decode_offset_fetch_response(self):
def mock_create_message_fns(self):
patches = contextlib.nested(
mock.patch.object(kafka.protocol, "create_message",
return_value=sentinel.message),
mock.patch.object(kafka.protocol, "create_gzip_message",
return_value=sentinel.gzip_message),
mock.patch.object(kafka.protocol, "create_snappy_message",
return_value=sentinel.snappy_message),
return_value=sentinel.message)
)

with patches:
yield

@contextmanager
def mock_create_gzip_message_fns(self):
patches = contextlib.nested(
mock.patch('kafka.protocol.create_encoded_message',
return_value=sentinel.gzip_message)
)

with patches:
yield

@contextmanager
def mock_create_snappy_message_fns(self):
patches = contextlib.nested(
mock.patch('kafka.protocol.create_encoded_message',
return_value=sentinel.snappy_message)
)

with patches:
Expand All @@ -730,13 +745,13 @@ def test_create_message_set(self):

# CODEC_GZIP: Expect list of one gzip-encoded message.
expect = [sentinel.gzip_message]
with self.mock_create_message_fns():
with self.mock_create_gzip_message_fns():
message_set = create_message_set(messages, CODEC_GZIP)
self.assertEqual(message_set, expect)

# CODEC_SNAPPY: Expect list of one snappy-encoded message.
expect = [sentinel.snappy_message]
with self.mock_create_message_fns():
with self.mock_create_snappy_message_fns():
message_set = create_message_set(messages, CODEC_SNAPPY)
self.assertEqual(message_set, expect)

Expand Down