Skip to content

Commit 59675e0

Browse files
authored
Decode and skip transactional control records in consumer (#2499)
1 parent a731b18 commit 59675e0

File tree

3 files changed

+81
-4
lines changed

3 files changed

+81
-4
lines changed

kafka/consumer/fetcher.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,10 +456,20 @@ def _unpack_message_set(self, tp, records):
456456
batch = records.next_batch()
457457
while batch is not None:
458458

459-
# LegacyRecordBatch cannot access either base_offset or last_offset_delta
459+
# Try DefaultsRecordBatch / message log format v2
460+
# base_offset, last_offset_delta, and control batches
460461
try:
461462
self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \
462463
batch.last_offset_delta
464+
# Control batches have a single record indicating whether a transaction
465+
# was aborted or committed.
466+
# When isolation_level is READ_COMMITTED (currently unsupported)
467+
# we should also skip all messages from aborted transactions
468+
# For now we only support READ_UNCOMMITTED and so we ignore the
469+
# abort/commit signal.
470+
if batch.is_control_batch:
471+
batch = records.next_batch()
472+
continue
463473
except AttributeError:
464474
pass
465475

@@ -674,7 +684,7 @@ def _create_fetch_requests(self):
674684
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
675685
log.debug(
676686
"Advance position for partition %s from %s to %s (last message batch location plus one)"
677-
" to correct for deleted compacted messages",
687+
" to correct for deleted compacted messages and/or transactional control records",
678688
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
679689
self._subscriptions.assignment[partition].position = next_offset_from_batch_header
680690

kafka/record/default_records.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,12 @@ def _read_msg(
269269
"payload, but instead read {}".format(length, pos - start_pos))
270270
self._pos = pos
271271

272-
return DefaultRecord(
273-
offset, timestamp, self.timestamp_type, key, value, headers)
272+
if self.is_control_batch:
273+
return ControlRecord(
274+
offset, timestamp, self.timestamp_type, key, value, headers)
275+
else:
276+
return DefaultRecord(
277+
offset, timestamp, self.timestamp_type, key, value, headers)
274278

275279
def __iter__(self):
276280
self._maybe_uncompress()
@@ -362,6 +366,45 @@ def __repr__(self):
362366
)
363367

364368

369+
class ControlRecord(DefaultRecord):
370+
__slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
371+
"_headers", "_version", "_type")
372+
373+
KEY_STRUCT = struct.Struct(
374+
">h" # Current Version => Int16
375+
"h" # Type => Int16 (0 indicates an abort marker, 1 indicates a commit)
376+
)
377+
378+
def __init__(self, offset, timestamp, timestamp_type, key, value, headers):
379+
super(ControlRecord, self).__init__(offset, timestamp, timestamp_type, key, value, headers)
380+
(self._version, self._type) = self.KEY_STRUCT.unpack(self._key)
381+
382+
# see https://kafka.apache.org/documentation/#controlbatch
383+
@property
384+
def version(self):
385+
return self._version
386+
387+
@property
388+
def type(self):
389+
return self._type
390+
391+
@property
392+
def abort(self):
393+
return self._type == 0
394+
395+
@property
396+
def commit(self):
397+
return self._type == 1
398+
399+
def __repr__(self):
400+
return (
401+
"ControlRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
402+
" version={!r}, type={!r} <{!s}>)".format(
403+
self._offset, self._timestamp, self._timestamp_type,
404+
self._version, self._type, "abort" if self.abort else "commit")
405+
)
406+
407+
365408
class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):
366409

367410
# excluding key, value and headers:

test/record/test_records.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@
6060
b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123'
6161
]
6262

63+
# Single record control batch (abort)
64+
control_batch_data_v2 = [
65+
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00R\x00\x00\x00\x00'
66+
b'\x02e\x97\xff\xd0\x00\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00'
67+
b'\x98\x96\x7f\x00\x00\x00\x00\x00\x98\x96'
68+
b'\x7f\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
69+
b'\x00\x00\x00\x01@\x00\x00\x00\x08\x00\x00\x00\x00,opaque-control-message\x00'
70+
]
71+
6372

6473
def test_memory_records_v2():
6574
data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4
@@ -230,3 +239,18 @@ def test_memory_records_builder_full(magic, compression_type):
230239
key=None, timestamp=None, value=b"M")
231240
assert metadata is None
232241
assert builder.next_offset() == 1
242+
243+
244+
def test_control_record_v2():
245+
data_bytes = b"".join(control_batch_data_v2)
246+
records = MemoryRecords(data_bytes)
247+
248+
assert records.has_next() is True
249+
batch = records.next_batch()
250+
assert batch.is_control_batch is True
251+
recs = list(batch)
252+
assert len(recs) == 1
253+
assert recs[0].version == 0
254+
assert recs[0].type == 0
255+
assert recs[0].abort is True
256+
assert recs[0].commit is False

0 commit comments

Comments
 (0)