Skip to content

Commit 73faefb

Browse files
committed
simplify fetchable; push partition randomization to FetchRequestData
1 parent 0296843 commit 73faefb

File tree

1 file changed

+14
-19
lines changed

1 file changed

+14
-19
lines changed

kafka/consumer/fetcher.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ def _create_fetch_requests(self):
687687
# create the fetch info as a dict of lists of partition info tuples
688688
# which can be passed to FetchRequest() via .items()
689689
version = self._client.api_version(FetchRequest, max_version=7)
690-
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
690+
fetchable = collections.defaultdict(dict)
691691

692692
for partition in self._fetchable_partitions():
693693
node_id = self._client.cluster.leader_for_partition(partition)
@@ -728,53 +728,41 @@ def _create_fetch_requests(self):
728728
-1, # log_start_offset is used internally by brokers / replicas only
729729
self.config['max_partition_fetch_bytes'],
730730
)
731-
fetchable[node_id][partition.topic].append(partition_info)
731+
fetchable[node_id][partition] = partition_info
732732
log.debug("Adding fetch request for partition %s at offset %d",
733733
partition, position)
734734

735735
requests = {}
736-
for node_id, partition_data in six.iteritems(fetchable):
737-
next_partitions = {TopicPartition(topic, partition_info[0]): partition_info
738-
for topic, partitions in six.iteritems(partition_data)
739-
for partition_info in partitions}
736+
for node_id, next_partitions in six.iteritems(fetchable):
740737
if version >= 7 and self.config['enable_incremental_fetch_sessions']:
741738
if node_id not in self._session_handlers:
742739
self._session_handlers[node_id] = FetchSessionHandler(node_id)
743740
session = self._session_handlers[node_id].build_next(next_partitions)
744741
else:
745742
# No incremental fetch support
746743
session = FetchRequestData(next_partitions, None, FetchMetadata.LEGACY)
747-
# As of version == 3 partitions will be returned in order as
748-
# they are requested, so to avoid starvation with
749-
# `fetch_max_bytes` option we need this shuffle
750-
# NOTE: we do have partition_data in random order due to usage
751-
# of unordered structures like dicts, but that does not
752-
# guarantee equal distribution, and starting in Python3.6
753-
# dicts retain insert order.
754-
partition_data = list(partition_data.items())
755-
random.shuffle(partition_data)
756744

757745
if version <= 2:
758746
request = FetchRequest[version](
759747
-1, # replica_id
760748
self.config['fetch_max_wait_ms'],
761749
self.config['fetch_min_bytes'],
762-
partition_data)
750+
session.to_send)
763751
elif version == 3:
764752
request = FetchRequest[version](
765753
-1, # replica_id
766754
self.config['fetch_max_wait_ms'],
767755
self.config['fetch_min_bytes'],
768756
self.config['fetch_max_bytes'],
769-
partition_data)
757+
session.to_send)
770758
elif version <= 6:
771759
request = FetchRequest[version](
772760
-1, # replica_id
773761
self.config['fetch_max_wait_ms'],
774762
self.config['fetch_min_bytes'],
775763
self.config['fetch_max_bytes'],
776764
self._isolation_level,
777-
partition_data)
765+
session.to_send)
778766
else:
779767
# Through v8
780768
request = FetchRequest[version](
@@ -1148,7 +1136,14 @@ def to_send(self):
11481136
partition_data = collections.defaultdict(list)
11491137
for tp, partition_info in six.iteritems(self._to_send):
11501138
partition_data[tp.topic].append(partition_info)
1151-
return list(partition_data.items())
1139+
# As of version == 3 partitions will be returned in order as
1140+
# they are requested, so to avoid starvation with
1141+
# `fetch_max_bytes` option we need this shuffle
1142+
# NOTE: we do have partition_data in random order due to usage
1143+
# of unordered structures like dicts, but that does not
1144+
# guarantee equal distribution, and starting in Python3.6
1145+
# dicts retain insert order.
1146+
return random.sample(list(partition_data.items()), k=len(partition_data))
11521147

11531148
@property
11541149
def to_forget(self):

0 commit comments

Comments
 (0)