Skip to content

Commit 98f5f09

Browse files
committed
more fixes for FetchRequestData
1 parent ba5154f commit 98f5f09

File tree

1 file changed

+11
-12
lines changed

1 file changed

+11
-12
lines changed

kafka/consumer/fetcher.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -734,16 +734,16 @@ def _create_fetch_requests(self):
734734

735735
requests = {}
736736
for node_id, partition_data in six.iteritems(fetchable):
737+
next_partitions = {TopicPartition(topic, partition_info[0]): partition_info
738+
for topic, partitions in six.iteritems(partition_data)
739+
for partition_info in partitions}
737740
if version >= 7 and self.config['enable_incremental_fetch_sessions']:
738741
if node_id not in self._session_handlers:
739742
self._session_handlers[node_id] = FetchSessionHandler(node_id)
740-
next_partitions = {TopicPartition(topic, partition_info[0]): partition_info
741-
for topic, partitions in six.iteritems(partition_data)
742-
for partition_info in partitions}
743743
session = self._session_handlers[node_id].build_next(next_partitions)
744744
else:
745745
# No incremental fetch support
746-
session = FetchRequestData(partition_data, [], FetchMetadata.LEGACY)
746+
session = FetchRequestData(next_partitions, None, FetchMetadata.LEGACY)
747747
# As of version == 3 partitions will be returned in order as
748748
# they are requested, so to avoid starvation with
749749
# `fetch_max_bytes` option we need this shuffle
@@ -789,10 +789,9 @@ def _create_fetch_requests(self):
789789
session.to_forget)
790790

791791
fetch_offsets = {}
792-
for topic, partitions in six.iteritems(partition_data):
793-
for partition_data in partitions:
794-
partition, offset = partition_data[:2]
795-
fetch_offsets[TopicPartition(topic, partition)] = offset
792+
for tp, partition_data in six.iteritems(next_partitions):
793+
offset = partition_data[1]
794+
fetch_offsets[tp] = offset
796795

797796
requests[node_id] = (request, fetch_offsets)
798797

@@ -997,7 +996,7 @@ def build_next(self, next_partitions):
997996
log.debug("Built full fetch %s for node %s with %s partition(s).",
998997
self.next_metadata, self.node_id, len(next_partitions))
999998
self.session_partitions = next_partitions
1000-
return FetchRequestData(next_partitions, [], self.next_metadata);
999+
return FetchRequestData(next_partitions, None, self.next_metadata);
10011000

10021001
prev_tps = set(self.session_partitions.keys())
10031002
next_tps = set(next_partitions.keys())
@@ -1126,8 +1125,8 @@ class FetchRequestData(object):
11261125
__slots__ = ('_to_send', '_to_forget', '_metadata')
11271126

11281127
def __init__(self, to_send, to_forget, metadata):
1129-
self._to_send = to_send # {TopicPartition: (partition, ...)}
1130-
self._to_forget = to_forget # {TopicPartition}
1128+
self._to_send = to_send or dict() # {TopicPartition: (partition, ...)}
1129+
self._to_forget = to_forget or set() # {TopicPartition}
11311130
self._metadata = metadata
11321131

11331132
@property
@@ -1156,7 +1155,7 @@ def to_forget(self):
11561155
# Return as list of [(topic, (partiiton, ...)), ...]
11571156
# so it an be passed directly to encoder
11581157
partition_data = collections.defaultdict(list)
1159-
for tp in six.iteritems(self._to_forget):
1158+
for tp in self._to_forget:
11601159
partition_data[tp.topic].append(tp.partition)
11611160
return list(partition_data.items())
11621161

0 commit comments

Comments
 (0)