Skip to content

Commit 61d7be5

Browse files
committed
OffsetAndTimestamp; FetchRequest v10
1 parent feecb27 commit 61d7be5

File tree

7 files changed

+108
-65
lines changed

7 files changed

+108
-65
lines changed

kafka/consumer/fetcher.py

Lines changed: 60 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
)
1919
from kafka.record import MemoryRecords
2020
from kafka.serializer import Deserializer
21-
from kafka.structs import TopicPartition, OffsetAndTimestamp
21+
from kafka.structs import TopicPartition, OffsetAndMetadata, OffsetAndTimestamp
2222

2323
log = logging.getLogger(__name__)
2424

@@ -28,7 +28,7 @@
2828
READ_COMMITTED = 1
2929

3030
ConsumerRecord = collections.namedtuple("ConsumerRecord",
31-
["topic", "partition", "offset", "timestamp", "timestamp_type",
31+
["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type",
3232
"key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"])
3333

3434

@@ -198,9 +198,6 @@ def get_offsets_by_times(self, timestamps, timeout_ms):
198198
for tp in timestamps:
199199
if tp not in offsets:
200200
offsets[tp] = None
201-
else:
202-
offset, timestamp = offsets[tp]
203-
offsets[tp] = OffsetAndTimestamp(offset, timestamp)
204201
return offsets
205202

206203
def beginning_offsets(self, partitions, timeout_ms):
@@ -215,7 +212,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
215212
timestamps = dict([(tp, timestamp) for tp in partitions])
216213
offsets = self._retrieve_offsets(timestamps, timeout_ms)
217214
for tp in timestamps:
218-
offsets[tp] = offsets[tp][0]
215+
offsets[tp] = offsets[tp].offset
219216
return offsets
220217

221218
def _reset_offset(self, partition):
@@ -240,7 +237,7 @@ def _reset_offset(self, partition):
240237
offsets = self._retrieve_offsets({partition: timestamp})
241238

242239
if partition in offsets:
243-
offset = offsets[partition][0]
240+
offset = offsets[partition].offset
244241

245242
# we might lose the assignment while fetching the offset,
246243
# so check it is still active
@@ -261,8 +258,8 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
261258
available. Otherwise timestamp is treated as epoch milliseconds.
262259
263260
Returns:
264-
{TopicPartition: (int, int)}: Mapping of partition to
265-
retrieved offset and timestamp. If offset does not exist for
261+
{TopicPartition: OffsetAndTimestamp}: Mapping of partition to
262+
retrieved offset, timestamp, and leader_epoch. If offset does not exist for
266263
the provided timestamp, that partition will be missing from
267264
this mapping.
268265
"""
@@ -373,28 +370,29 @@ def _append(self, drained, part, max_records, update_offsets):
373370
log.debug("Not returning fetched records for assigned partition"
374371
" %s since it is no longer fetchable", tp)
375372

376-
elif fetch_offset == position:
373+
elif fetch_offset == position.offset:
377374
# we are ensured to have at least one record since we already checked for emptiness
378375
part_records = part.take(max_records)
379376
next_offset = part_records[-1].offset + 1
377+
leader_epoch = part_records[-1].leader_epoch
380378

381379
log.log(0, "Returning fetched records at offset %d for assigned"
382-
" partition %s and update position to %s", position,
383-
tp, next_offset)
380+
" partition %s and update position to %s (leader epoch %s)", position.offset,
381+
tp, next_offset, leader_epoch)
384382

385383
for record in part_records:
386384
drained[tp].append(record)
387385

388386
if update_offsets:
389-
self._subscriptions.assignment[tp].position = next_offset
387+
self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, b'', leader_epoch)
390388
return len(part_records)
391389

392390
else:
393391
# these records aren't next in line based on the last consumed
394392
# position, ignore them they must be from an obsolete request
395393
log.debug("Ignoring fetched records for %s at offset %s since"
396394
" the current position is %d", tp, part.fetch_offset,
397-
position)
395+
position.offset)
398396

399397
part.discard()
400398
return 0
@@ -444,13 +442,13 @@ def _message_generator(self):
444442
break
445443

446444
# Compressed messagesets may include earlier messages
447-
elif msg.offset < self._subscriptions.assignment[tp].position:
445+
elif msg.offset < self._subscriptions.assignment[tp].position.offset:
448446
log.debug("Skipping message offset: %s (expecting %s)",
449447
msg.offset,
450-
self._subscriptions.assignment[tp].position)
448+
self._subscriptions.assignment[tp].position.offset)
451449
continue
452450

453-
self._subscriptions.assignment[tp].position = msg.offset + 1
451+
self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, b'', -1)
454452
yield msg
455453

456454
self._next_partition_records = None
@@ -463,8 +461,9 @@ def _unpack_records(self, tp, records):
463461
# Try DefaultsRecordBatch / message log format v2
464462
# base_offset, last_offset_delta, and control batches
465463
try:
466-
self._subscriptions.assignment[tp].last_offset_from_record_batch = batch.base_offset + \
467-
batch.last_offset_delta
464+
batch_offset = batch.base_offset + batch.last_offset_delta
465+
leader_epoch = batch.leader_epoch
466+
self._subscriptions.assignment[tp].last_offset_from_record_batch = batch_offset
468467
# Control batches have a single record indicating whether a transaction
469468
# was aborted or committed.
470469
# When isolation_level is READ_COMMITTED (currently unsupported)
@@ -475,6 +474,7 @@ def _unpack_records(self, tp, records):
475474
batch = records.next_batch()
476475
continue
477476
except AttributeError:
477+
leader_epoch = -1
478478
pass
479479

480480
for record in batch:
@@ -491,7 +491,7 @@ def _unpack_records(self, tp, records):
491491
len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in
492492
headers) if headers else -1
493493
yield ConsumerRecord(
494-
tp.topic, tp.partition, record.offset, record.timestamp,
494+
tp.topic, tp.partition, leader_epoch, record.offset, record.timestamp,
495495
record.timestamp_type, key, value, headers, record.checksum,
496496
key_size, value_size, header_size)
497497

@@ -577,7 +577,9 @@ def _send_list_offsets_request(self, node_id, timestamps):
577577
version = self._client.api_version(ListOffsetsRequest, max_version=3)
578578
by_topic = collections.defaultdict(list)
579579
for tp, timestamp in six.iteritems(timestamps):
580-
if version >= 1:
580+
if version >= 4:
581+
data = (tp.partition, leader_epoch, timestamp)
582+
elif version >= 1:
581583
data = (tp.partition, timestamp)
582584
else:
583585
data = (tp.partition, timestamp, 1)
@@ -630,17 +632,18 @@ def _handle_list_offsets_response(self, future, response):
630632
offset = UNKNOWN_OFFSET
631633
else:
632634
offset = offsets[0]
633-
log.debug("Handling v0 ListOffsetsResponse response for %s. "
634-
"Fetched offset %s", partition, offset)
635-
if offset != UNKNOWN_OFFSET:
636-
timestamp_offset_map[partition] = (offset, None)
637-
else:
635+
timestamp = None
636+
leader_epoch = -1
637+
elif response.API_VERSION <= 3:
638638
timestamp, offset = partition_info[2:]
639-
log.debug("Handling ListOffsetsResponse response for %s. "
640-
"Fetched offset %s, timestamp %s",
641-
partition, offset, timestamp)
642-
if offset != UNKNOWN_OFFSET:
643-
timestamp_offset_map[partition] = (offset, timestamp)
639+
leader_epoch = -1
640+
else:
641+
timestamp, offset, leader_epoch = partition_info[2:]
642+
log.debug("Handling ListOffsetsResponse response for %s. "
643+
"Fetched offset %s, timestamp %s, leader_epoch %s",
644+
partition, offset, timestamp, leader_epoch)
645+
if offset != UNKNOWN_OFFSET:
646+
timestamp_offset_map[partition] = OffsetAndTimestamp(offset, timestamp, leader_epoch)
644647
elif error_type is Errors.UnsupportedForMessageFormatError:
645648
# The message format on the broker side is before 0.10.0,
646649
# we simply put None in the response.
@@ -688,7 +691,7 @@ def _create_fetch_requests(self):
688691
"""
689692
# create the fetch info as a dict of lists of partition info tuples
690693
# which can be passed to FetchRequest() via .items()
691-
version = self._client.api_version(FetchRequest, max_version=7)
694+
version = self._client.api_version(FetchRequest, max_version=10)
692695
fetchable = collections.defaultdict(dict)
693696

694697
for partition in self._fetchable_partitions():
@@ -697,12 +700,12 @@ def _create_fetch_requests(self):
697700
# advance position for any deleted compacted messages if required
698701
if self._subscriptions.assignment[partition].last_offset_from_record_batch:
699702
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1
700-
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
703+
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position.offset:
701704
log.debug(
702705
"Advance position for partition %s from %s to %s (last record batch location plus one)"
703706
" to correct for deleted compacted messages and/or transactional control records",
704-
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
705-
self._subscriptions.assignment[partition].position = next_offset_from_batch_header
707+
partition, self._subscriptions.assignment[partition].position.offset, next_offset_from_batch_header)
708+
self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, b'', -1)
706709

707710
position = self._subscriptions.assignment[partition].position
708711

@@ -720,19 +723,28 @@ def _create_fetch_requests(self):
720723
if version < 5:
721724
partition_info = (
722725
partition.partition,
723-
position,
726+
position.offset,
724727
self.config['max_partition_fetch_bytes']
725728
)
729+
elif version <= 8:
730+
partition_info = (
731+
partition.partition,
732+
position.offset,
733+
-1, # log_start_offset is used internally by brokers / replicas only
734+
self.config['max_partition_fetch_bytes'],
735+
)
726736
else:
727737
partition_info = (
728738
partition.partition,
729-
position,
739+
position.leader_epoch,
740+
position.offset,
730741
-1, # log_start_offset is used internally by brokers / replicas only
731742
self.config['max_partition_fetch_bytes'],
732743
)
744+
733745
fetchable[node_id][partition] = partition_info
734746
log.debug("Adding fetch request for partition %s at offset %d",
735-
partition, position)
747+
partition, position.offset)
736748

737749
requests = {}
738750
for node_id, next_partitions in six.iteritems(fetchable):
@@ -780,7 +792,10 @@ def _create_fetch_requests(self):
780792

781793
fetch_offsets = {}
782794
for tp, partition_data in six.iteritems(next_partitions):
783-
offset = partition_data[1]
795+
if version <= 8:
796+
offset = partition_data[1]
797+
else:
798+
offset = partition_data[2]
784799
fetch_offsets[tp] = offset
785800

786801
requests[node_id] = (request, fetch_offsets)
@@ -809,7 +824,7 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
809824
tp = TopicPartition(topic, partition_data[0])
810825
fetch_offset = fetch_offsets[tp]
811826
completed_fetch = CompletedFetch(
812-
tp, fetch_offsets[tp],
827+
tp, fetch_offset,
813828
response.API_VERSION,
814829
partition_data[1:],
815830
metric_aggregator
@@ -851,18 +866,18 @@ def _parse_fetched_data(self, completed_fetch):
851866
# Note that the *response* may return a messageset that starts
852867
# earlier (e.g., compressed messages) or later (e.g., compacted topic)
853868
position = self._subscriptions.assignment[tp].position
854-
if position is None or position != fetch_offset:
869+
if position is None or position.offset != fetch_offset:
855870
log.debug("Discarding fetch response for partition %s"
856871
" since its offset %d does not match the"
857872
" expected offset %d", tp, fetch_offset,
858-
position)
873+
position.offset)
859874
return None
860875

861876
records = MemoryRecords(completed_fetch.partition_data[-1])
862877
if records.has_next():
863878
log.debug("Adding fetched record for partition %s with"
864879
" offset %d to buffered record list", tp,
865-
position)
880+
position.offset)
866881
unpacked = list(self._unpack_records(tp, records))
867882
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
868883
if unpacked:
@@ -893,10 +908,10 @@ def _parse_fetched_data(self, completed_fetch):
893908
self._client.cluster.request_update()
894909
elif error_type is Errors.OffsetOutOfRangeError:
895910
position = self._subscriptions.assignment[tp].position
896-
if position is None or position != fetch_offset:
911+
if position is None or position.offset != fetch_offset:
897912
log.debug("Discarding stale fetch response for partition %s"
898913
" since the fetched offset %d does not match the"
899-
" current offset %d", tp, fetch_offset, position)
914+
" current offset %d", tp, fetch_offset, position.offset)
900915
elif self._subscriptions.has_default_offset_reset_policy():
901916
log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp)
902917
self._subscriptions.need_offset_reset(tp)

kafka/consumer/group.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1818
from kafka.metrics import MetricConfig, Metrics
1919
from kafka.protocol.list_offsets import OffsetResetStrategy
20-
from kafka.structs import TopicPartition
20+
from kafka.structs import OffsetAndMetadata, TopicPartition
2121
from kafka.version import __version__
2222

2323
log = logging.getLogger(__name__)
@@ -737,11 +737,11 @@ def position(self, partition):
737737
if not isinstance(partition, TopicPartition):
738738
raise TypeError('partition must be a TopicPartition namedtuple')
739739
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
740-
offset = self._subscription.assignment[partition].position
740+
position = self._subscription.assignment[partition].position
741741
if offset is None:
742742
self._update_fetch_positions([partition])
743-
offset = self._subscription.assignment[partition].position
744-
return offset
743+
position = self._subscription.assignment[partition].position
744+
return position.offset
745745

746746
def highwater(self, partition):
747747
"""Last known highwater offset for a partition.
@@ -1144,7 +1144,7 @@ def _message_generator_v2(self):
11441144
log.debug("Not returning fetched records for partition %s"
11451145
" since it is no longer fetchable", tp)
11461146
break
1147-
self._subscription.assignment[tp].position = record.offset + 1
1147+
self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, b'', -1)
11481148
yield record
11491149

11501150
def _message_generator(self):

kafka/consumer/subscription_state.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def all_consumed_offsets(self):
319319
all_consumed = {}
320320
for partition, state in six.iteritems(self.assignment):
321321
if state.has_valid_position:
322-
all_consumed[partition] = OffsetAndMetadata(state.position, b'', -1)
322+
all_consumed[partition] = state.position
323323
return all_consumed
324324

325325
def need_offset_reset(self, partition, offset_reset_strategy=None):
@@ -379,7 +379,7 @@ def __init__(self):
379379
self.paused = False # whether this partition has been paused by the user
380380
self.awaiting_reset = False # whether we are awaiting reset
381381
self.reset_strategy = None # the reset strategy if awaitingReset is set
382-
self._position = None # offset exposed to the user
382+
self._position = None # OffsetAndMetadata exposed to the user
383383
self.highwater = None
384384
self.drop_pending_record_batch = False
385385
# The last message offset hint available from a record batch with
@@ -388,6 +388,7 @@ def __init__(self):
388388

389389
def _set_position(self, offset):
390390
assert self.has_valid_position, 'Valid position required'
391+
assert isinstance(offset, OffsetAndMetadata)
391392
self._position = offset
392393

393394
def _get_position(self):
@@ -403,7 +404,7 @@ def await_reset(self, strategy):
403404
self.has_valid_position = False
404405

405406
def seek(self, offset):
406-
self._position = offset
407+
self._position = OffsetAndMetadata(offset, b'', -1)
407408
self.awaiting_reset = False
408409
self.reset_strategy = None
409410
self.has_valid_position = True

kafka/record/default_records.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ def __init__(self, buffer):
136136
def base_offset(self):
137137
return self._header_data[0]
138138

139+
@property
140+
def leader_epoch(self):
141+
return self._header_data[2]
142+
139143
@property
140144
def magic(self):
141145
return self._header_data[3]

kafka/structs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@
6666
Keyword Arguments:
6767
offset (int): An offset
6868
timestamp (int): The timestamp associated to the offset
69+
leader_epoch (int): The last known epoch from the leader / broker
6970
"""
7071
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
71-
["offset", "timestamp"])
72+
["offset", "timestamp", "leader_epoch"])
7273

7374
MemberInformation = namedtuple("MemberInformation",
7475
["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])

test/test_consumer_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,8 @@ def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_pro
259259
})
260260

261261
assert offsets == {
262-
tp0: OffsetAndTimestamp(p0msg.offset, send_time),
263-
tp1: OffsetAndTimestamp(p1msg.offset, send_time)
262+
tp0: OffsetAndTimestamp(p0msg.offset, send_time, -1),
263+
tp1: OffsetAndTimestamp(p1msg.offset, send_time, -1)
264264
}
265265

266266
offsets = consumer.beginning_offsets([tp0, tp1])

0 commit comments

Comments
 (0)