Skip to content

Remove legacy/v1 consumer message iterator #2543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 4 additions & 81 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ class KafkaConsumer(six.Iterator):
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'socks5_proxy': None,
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
'kafka_client': KafkaClient,
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
Expand Down Expand Up @@ -845,8 +844,7 @@ def seek(self, partition, offset):
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
log.debug("Seeking to offset %s for partition %s", offset, partition)
self._subscription.assignment[partition].seek(offset)
if not self.config['legacy_iterator']:
self._iterator = None
self._iterator = None

def seek_to_beginning(self, *partitions):
"""Seek to the oldest available offset for partitions.
Expand All @@ -871,8 +869,7 @@ def seek_to_beginning(self, *partitions):
for tp in partitions:
log.debug("Seeking to beginning of partition %s", tp)
self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
if not self.config['legacy_iterator']:
self._iterator = None
self._iterator = None

def seek_to_end(self, *partitions):
"""Seek to the most recent available offset for partitions.
Expand All @@ -897,8 +894,7 @@ def seek_to_end(self, *partitions):
for tp in partitions:
log.debug("Seeking to end of partition %s", tp)
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
if not self.config['legacy_iterator']:
self._iterator = None
self._iterator = None

def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern.
Expand Down Expand Up @@ -974,8 +970,7 @@ def unsubscribe(self):
self._client.cluster.need_all_topic_metadata = False
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
if not self.config['legacy_iterator']:
self._iterator = None
self._iterator = None

def metrics(self, raw=False):
"""Get metrics on consumer performance.
Expand Down Expand Up @@ -1157,73 +1152,12 @@ def _message_generator_v2(self):
self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, '', -1)
yield record

def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'

def inner_poll_ms():
return max(0, min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms']))

while time.time() < self._consumer_timeout:

if not self._coordinator.poll(timeout_ms=inner_poll_ms()):
continue

# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)

self._client.poll(timeout_ms=inner_poll_ms())

# after the long poll, we should check whether the group needs to rebalance
# prior to returning data so that the group can stabilize faster
if self._coordinator.need_rejoin():
continue

# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()

# Short-circuit the fetch iterator if we are already timed out
# to avoid any unintentional interaction with fetcher setup
if time.time() > timeout_at:
continue

for msg in self._fetcher:
yield msg
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
self._client.poll(timeout_ms=0)

# An else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
# We assume that it is safe to init_fetches when fetcher is done
# i.e., there are no more records stored internally
else:
self._fetcher.send_fetches()

def _next_timeout(self):
timeout = min(self._consumer_timeout,
self._client.cluster.ttl() / 1000.0 + time.time(),
self._coordinator.time_to_next_poll() + time.time())
return timeout

def __iter__(self): # pylint: disable=non-iterator-returned
return self

def __next__(self):
if self._closed:
raise StopIteration('KafkaConsumer closed')
# Now that the heartbeat thread runs in the background
# there should be no reason to maintain a separate iterator
# but we'll keep it available for a few releases just in case
if self.config['legacy_iterator']:
return self.next_v1()
else:
return self.next_v2()

def next_v2(self):
self._set_consumer_timeout()
while time.time() < self._consumer_timeout:
if not self._iterator:
Expand All @@ -1234,17 +1168,6 @@ def next_v2(self):
self._iterator = None
raise StopIteration()

def next_v1(self):
if not self._iterator:
self._iterator = self._message_generator()

self._set_consumer_timeout()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise

def _set_consumer_timeout(self):
# consumer_timeout_ms can be used to stop iteration early
if self.config['consumer_timeout_ms'] >= 0:
Expand Down
Loading