Skip to content

Commit 7a7b4b9

Browse files
tvoinarovskyi88manpreet
authored andcommitted
Fix MemoryRecord bugs re error handling and add test coverage (dpkp#1448)
1 parent ff21b03 commit 7a7b4b9

File tree

4 files changed

+75
-8
lines changed

4 files changed

+75
-8
lines changed

kafka/record/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from kafka.record.memory_records import MemoryRecords
1+
from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
22

3-
__all__ = ["MemoryRecords"]
3+
__all__ = ["MemoryRecords", "MemoryRecordsBuilder"]

kafka/record/default_records.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def _read_msg(
237237

238238
# validate whether we have read all header bytes in the current record
239239
if pos - start_pos != length:
240-
CorruptRecordException(
240+
raise CorruptRecordException(
241241
"Invalid record size: expected to read {} bytes in record "
242242
"payload, but instead read {}".format(length, pos - start_pos))
243243
self._pos = pos

kafka/record/memory_records.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#
1919
# So we can iterate over batches just by knowing offsets of Length. Magic is
2020
# used to construct the correct class for Batch itself.
21+
from __future__ import division
2122

2223
import struct
2324

@@ -131,15 +132,14 @@ def __init__(self, magic, compression_type, batch_size):
131132
def append(self, timestamp, key, value, headers=[]):
132133
""" Append a message to the buffer.
133134
134-
Returns:
135-
(int, int): checksum and bytes written
135+
Returns: RecordMetadata or None if unable to append
136136
"""
137137
if self._closed:
138-
return None, 0
138+
return None
139139

140140
offset = self._next_offset
141141
metadata = self._builder.append(offset, timestamp, key, value, headers)
142-
# Return of 0 size means there's no space to add a new message
142+
# Return of None means there's no space to add a new message
143143
if metadata is None:
144144
return None
145145

test/record/test_records.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import unicode_literals
13
import pytest
2-
from kafka.record import MemoryRecords
4+
from kafka.record import MemoryRecords, MemoryRecordsBuilder
35
from kafka.errors import CorruptRecordException
46

57
# This is real live data from Kafka 11 broker
@@ -152,3 +154,68 @@ def test_memory_records_corrupt():
152154
)
153155
with pytest.raises(CorruptRecordException):
154156
records.next_batch()
157+
158+
159+
@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
160+
@pytest.mark.parametrize("magic", [0, 1, 2])
161+
def test_memory_records_builder(magic, compression_type):
162+
builder = MemoryRecordsBuilder(
163+
magic=magic, compression_type=compression_type, batch_size=1024 * 10)
164+
base_size = builder.size_in_bytes() # V2 has a header before
165+
166+
msg_sizes = []
167+
for offset in range(10):
168+
metadata = builder.append(
169+
timestamp=10000 + offset, key=b"test", value=b"Super")
170+
msg_sizes.append(metadata.size)
171+
assert metadata.offset == offset
172+
if magic > 0:
173+
assert metadata.timestamp == 10000 + offset
174+
else:
175+
assert metadata.timestamp == -1
176+
assert builder.next_offset() == offset + 1
177+
178+
# Error appends should not leave junk behind, like null bytes or something
179+
with pytest.raises(TypeError):
180+
builder.append(
181+
timestamp=None, key="test", value="Super") # Not bytes, but str
182+
183+
assert not builder.is_full()
184+
size_before_close = builder.size_in_bytes()
185+
assert size_before_close == sum(msg_sizes) + base_size
186+
187+
# Size should remain the same after closing. No traling bytes
188+
builder.close()
189+
assert builder.compression_rate() > 0
190+
expected_size = size_before_close * builder.compression_rate()
191+
assert builder.is_full()
192+
assert builder.size_in_bytes() == expected_size
193+
buffer = builder.buffer()
194+
assert len(buffer) == expected_size
195+
196+
# We can close second time, as in retry
197+
builder.close()
198+
assert builder.size_in_bytes() == expected_size
199+
assert builder.buffer() == buffer
200+
201+
# Can't append after close
202+
meta = builder.append(timestamp=None, key=b"test", value=b"Super")
203+
assert meta is None
204+
205+
206+
@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
207+
@pytest.mark.parametrize("magic", [0, 1, 2])
208+
def test_memory_records_builder_full(magic, compression_type):
209+
builder = MemoryRecordsBuilder(
210+
magic=magic, compression_type=compression_type, batch_size=1024 * 10)
211+
212+
# 1 message should always be appended
213+
metadata = builder.append(
214+
key=None, timestamp=None, value=b"M" * 10240)
215+
assert metadata is not None
216+
assert builder.is_full()
217+
218+
metadata = builder.append(
219+
key=None, timestamp=None, value=b"M")
220+
assert metadata is None
221+
assert builder.next_offset() == 1

0 commit comments

Comments
 (0)