|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +from __future__ import unicode_literals |
1 | 3 | import pytest
|
2 |
| -from kafka.record import MemoryRecords |
| 4 | +from kafka.record import MemoryRecords, MemoryRecordsBuilder |
3 | 5 | from kafka.errors import CorruptRecordException
|
4 | 6 |
|
5 | 7 | # This is real live data from Kafka 11 broker
|
@@ -152,3 +154,68 @@ def test_memory_records_corrupt():
|
152 | 154 | )
|
153 | 155 | with pytest.raises(CorruptRecordException):
|
154 | 156 | records.next_batch()
|
| 157 | + |
| 158 | + |
| 159 | +@pytest.mark.parametrize("compression_type", [0, 1, 2, 3]) |
| 160 | +@pytest.mark.parametrize("magic", [0, 1, 2]) |
| 161 | +def test_memory_records_builder(magic, compression_type): |
| 162 | + builder = MemoryRecordsBuilder( |
| 163 | + magic=magic, compression_type=compression_type, batch_size=1024 * 10) |
| 164 | + base_size = builder.size_in_bytes() # V2 has a header before |
| 165 | + |
| 166 | + msg_sizes = [] |
| 167 | + for offset in range(10): |
| 168 | + metadata = builder.append( |
| 169 | + timestamp=10000 + offset, key=b"test", value=b"Super") |
| 170 | + msg_sizes.append(metadata.size) |
| 171 | + assert metadata.offset == offset |
| 172 | + if magic > 0: |
| 173 | + assert metadata.timestamp == 10000 + offset |
| 174 | + else: |
| 175 | + assert metadata.timestamp == -1 |
| 176 | + assert builder.next_offset() == offset + 1 |
| 177 | + |
| 178 | + # Error appends should not leave junk behind, like null bytes or something |
| 179 | + with pytest.raises(TypeError): |
| 180 | + builder.append( |
| 181 | + timestamp=None, key="test", value="Super") # Not bytes, but str |
| 182 | + |
| 183 | + assert not builder.is_full() |
| 184 | + size_before_close = builder.size_in_bytes() |
| 185 | + assert size_before_close == sum(msg_sizes) + base_size |
| 186 | + |
| 187 | + # Size should remain the same after closing. No traling bytes |
| 188 | + builder.close() |
| 189 | + assert builder.compression_rate() > 0 |
| 190 | + expected_size = size_before_close * builder.compression_rate() |
| 191 | + assert builder.is_full() |
| 192 | + assert builder.size_in_bytes() == expected_size |
| 193 | + buffer = builder.buffer() |
| 194 | + assert len(buffer) == expected_size |
| 195 | + |
| 196 | + # We can close second time, as in retry |
| 197 | + builder.close() |
| 198 | + assert builder.size_in_bytes() == expected_size |
| 199 | + assert builder.buffer() == buffer |
| 200 | + |
| 201 | + # Can't append after close |
| 202 | + meta = builder.append(timestamp=None, key=b"test", value=b"Super") |
| 203 | + assert meta is None |
| 204 | + |
| 205 | + |
| 206 | +@pytest.mark.parametrize("compression_type", [0, 1, 2, 3]) |
| 207 | +@pytest.mark.parametrize("magic", [0, 1, 2]) |
| 208 | +def test_memory_records_builder_full(magic, compression_type): |
| 209 | + builder = MemoryRecordsBuilder( |
| 210 | + magic=magic, compression_type=compression_type, batch_size=1024 * 10) |
| 211 | + |
| 212 | + # 1 message should always be appended |
| 213 | + metadata = builder.append( |
| 214 | + key=None, timestamp=None, value=b"M" * 10240) |
| 215 | + assert metadata is not None |
| 216 | + assert builder.is_full() |
| 217 | + |
| 218 | + metadata = builder.append( |
| 219 | + key=None, timestamp=None, value=b"M") |
| 220 | + assert metadata is None |
| 221 | + assert builder.next_offset() == 1 |
0 commit comments