Skip to content

Commit dbfe941

Browse files
committed
KAFKA-3949: Avoid race condition when subscription changes during rebalance
1 parent 08a7fb7 commit dbfe941

File tree

7 files changed

+129
-109
lines changed

7 files changed

+129
-109
lines changed

kafka/cluster.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,13 @@ def update_metadata(self, metadata):
291291
for listener in self._listeners:
292292
listener(self)
293293

294+
if self.need_all_topic_metadata:
295+
# the listener may change the interested topics,
296+
# which could cause another metadata refresh.
297+
# If we have already fetched all topics, however,
298+
# another fetch should be unnecessary.
299+
self._need_update = False
300+
294301
def add_listener(self, listener):
295302
"""Add a callback function to be called on each metadata update"""
296303
self._listeners.add(listener)

kafka/consumer/fetcher.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,6 @@ def fetched_records(self, max_records=None):
326326
max_records = self.config['max_poll_records']
327327
assert max_records > 0
328328

329-
if self._subscriptions.needs_partition_assignment:
330-
return {}, False
331-
332329
drained = collections.defaultdict(list)
333330
records_remaining = max_records
334331

@@ -397,9 +394,6 @@ def _append(self, drained, part, max_records):
397394

398395
def _message_generator(self):
399396
"""Iterate over fetched_records"""
400-
if self._subscriptions.needs_partition_assignment:
401-
raise StopIteration('Subscription needs partition assignment')
402-
403397
while self._next_partition_records or self._completed_fetches:
404398

405399
if not self._next_partition_records:

kafka/consumer/group.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,11 @@ def _poll_once(self, timeout_ms, max_records):
644644

645645
timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll())
646646
self._client.poll(timeout_ms=timeout_ms)
647+
# after the long poll, we should check whether the group needs to rebalance
648+
# prior to returning data so that the group can stabilize faster
649+
if self._coordinator.need_rejoin():
650+
return {}
651+
647652
records, _ = self._fetcher.fetched_records(max_records)
648653
return records
649654

@@ -1055,6 +1060,11 @@ def _message_generator(self):
10551060
poll_ms = 0
10561061
self._client.poll(timeout_ms=poll_ms)
10571062

1063+
# after the long poll, we should check whether the group needs to rebalance
1064+
# prior to returning data so that the group can stabilize faster
1065+
if self._coordinator.need_rejoin():
1066+
continue
1067+
10581068
# We need to make sure we at least keep up with scheduled tasks,
10591069
# like heartbeats, auto-commits, and metadata refreshes
10601070
timeout_at = self._next_timeout()

kafka/consumer/subscription_state.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ def __init__(self, offset_reset_strategy='earliest'):
6868
self._group_subscription = set()
6969
self._user_assignment = set()
7070
self.assignment = dict()
71-
self.needs_partition_assignment = False
7271
self.listener = None
7372

7473
# initialize to true for the consumers to fetch offset upon starting up
@@ -172,7 +171,6 @@ def change_subscription(self, topics):
172171
log.info('Updating subscribed topics to: %s', topics)
173172
self.subscription = set(topics)
174173
self._group_subscription.update(topics)
175-
self.needs_partition_assignment = True
176174

177175
# Remove any assigned partitions which are no longer subscribed to
178176
for tp in set(self.assignment.keys()):
@@ -192,12 +190,12 @@ def group_subscribe(self, topics):
192190
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
193191
self._group_subscription.update(topics)
194192

195-
def mark_for_reassignment(self):
193+
def reset_group_subscription(self):
194+
"""Reset the group's subscription to only contain topics subscribed by this consumer."""
196195
if self._user_assignment:
197196
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
198197
assert self.subscription is not None, 'Subscription required'
199198
self._group_subscription.intersection_update(self.subscription)
200-
self.needs_partition_assignment = True
201199

202200
def assign_from_user(self, partitions):
203201
"""Manually assign a list of TopicPartitions to this consumer.
@@ -220,18 +218,17 @@ def assign_from_user(self, partitions):
220218
if self.subscription is not None:
221219
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
222220

223-
self._user_assignment.clear()
224-
self._user_assignment.update(partitions)
221+
if self._user_assignment != set(partitions):
222+
self._user_assignment = set(partitions)
225223

226-
for partition in partitions:
227-
if partition not in self.assignment:
228-
self._add_assigned_partition(partition)
224+
for partition in partitions:
225+
if partition not in self.assignment:
226+
self._add_assigned_partition(partition)
229227

230-
for tp in set(self.assignment.keys()) - self._user_assignment:
231-
del self.assignment[tp]
228+
for tp in set(self.assignment.keys()) - self._user_assignment:
229+
del self.assignment[tp]
232230

233-
self.needs_partition_assignment = False
234-
self.needs_fetch_committed_offsets = True
231+
self.needs_fetch_committed_offsets = True
235232

236233
def assign_from_subscribed(self, assignments):
237234
"""Update the assignment to the specified partitions
@@ -245,24 +242,25 @@ def assign_from_subscribed(self, assignments):
245242
assignments (list of TopicPartition): partitions to assign to this
246243
consumer instance.
247244
"""
248-
if self.subscription is None:
245+
if not self.partitions_auto_assigned():
249246
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
250247

251248
for tp in assignments:
252249
if tp.topic not in self.subscription:
253250
raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
251+
252+
# after rebalancing, we always reinitialize the assignment state
254253
self.assignment.clear()
255254
for tp in assignments:
256255
self._add_assigned_partition(tp)
257-
self.needs_partition_assignment = False
256+
self.needs_fetch_committed_offsets = True
258257
log.info("Updated partition assignment: %s", assignments)
259258

260259
def unsubscribe(self):
261260
"""Clear all topic subscriptions and partition assignments"""
262261
self.subscription = None
263262
self._user_assignment.clear()
264263
self.assignment.clear()
265-
self.needs_partition_assignment = True
266264
self.subscribed_pattern = None
267265

268266
def group_subscription(self):

kafka/coordinator/base.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -344,23 +344,25 @@ def _handle_join_failure(self, _):
344344
def ensure_active_group(self):
345345
"""Ensure that the group is active (i.e. joined and synced)"""
346346
with self._lock:
347-
if not self.need_rejoin():
348-
return
349-
350-
# call on_join_prepare if needed. We set a flag to make sure that
351-
# we do not call it a second time if the client is woken up before
352-
# a pending rebalance completes.
353-
if not self.rejoining:
354-
self._on_join_prepare(self._generation.generation_id,
355-
self._generation.member_id)
356-
self.rejoining = True
357-
358347
if self._heartbeat_thread is None:
359348
self._start_heartbeat_thread()
360349

361350
while self.need_rejoin():
362351
self.ensure_coordinator_ready()
363352

353+
# call on_join_prepare if needed. We set a flag
354+
# to make sure that we do not call it a second
355+
# time if the client is woken up before a pending
356+
# rebalance completes. This must be called on each
357+
# iteration of the loop because an event requiring
358+
# a rebalance (such as a metadata refresh which
359+
# changes the matched subscription set) can occur
360+
# while another rebalance is still in progress.
361+
if not self.rejoining:
362+
self._on_join_prepare(self._generation.generation_id,
363+
self._generation.member_id)
364+
self.rejoining = True
365+
364366
# ensure that there are no pending requests to the coordinator.
365367
# This is important in particular to avoid resending a pending
366368
# JoinGroup request.

kafka/coordinator/consumer.py

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ def __init__(self, client, subscription, metrics, **configs):
8484
self.config[key] = configs[key]
8585

8686
self._subscription = subscription
87+
self._is_leader = False
88+
self._joined_subscription = set()
8789
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
8890
self._assignment_snapshot = None
8991
self._cluster = client.cluster
@@ -119,6 +121,7 @@ def __init__(self, client, subscription, metrics, **configs):
119121
self.consumer_sensors = ConsumerCoordinatorMetrics(
120122
metrics, self.config['metric_group_prefix'], self._subscription)
121123

124+
#self._handle_metadata_update(self._cluster)
122125
self._cluster.request_update()
123126
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
124127

@@ -132,11 +135,22 @@ def protocol_type(self):
132135

133136
def group_protocols(self):
134137
"""Returns list of preferred (protocols, metadata)"""
135-
topics = self._subscription.subscription
136-
assert topics is not None, 'Consumer has not subscribed to topics'
138+
if self._subscription.subscription is None:
139+
raise Errors.IllegalStateError('Consumer has not subscribed to topics')
140+
# dpkp note: I really dislike this.
141+
# why? because we are using this strange method group_protocols,
142+
# which is seemingly innocuous, to set internal state (_joined_subscription)
143+
# that is later used to check whether metadata has changed since we joined a group
144+
# but there is no guarantee that this method, group_protocols, will get called
145+
# in the correct sequence or that it will only be called when we want it to be.
146+
# So this really should be moved elsewhere, but I don't have the energy to
147+
# work that out right now. If you read this at some later date after the mutable
148+
# state has bitten you... I'm sorry! It mimics the java client, and that's the
149+
# best I've got for now.
150+
self._joined_subscription = set(self._subscription.subscription)
137151
metadata_list = []
138152
for assignor in self.config['assignors']:
139-
metadata = assignor.metadata(topics)
153+
metadata = assignor.metadata(self._joined_subscription)
140154
group_protocol = (assignor.name, metadata)
141155
metadata_list.append(group_protocol)
142156
return metadata_list
@@ -158,21 +172,29 @@ def _handle_metadata_update(self, cluster):
158172

159173
# check if there are any changes to the metadata which should trigger
160174
# a rebalance
161-
if self._subscription_metadata_changed(cluster):
162-
163-
if (self.config['api_version'] >= (0, 9)
164-
and self.config['group_id'] is not None):
165-
166-
self._subscription.mark_for_reassignment()
167-
168-
# If we haven't got group coordinator support,
169-
# just assign all partitions locally
170-
else:
171-
self._subscription.assign_from_subscribed([
172-
TopicPartition(topic, partition)
173-
for topic in self._subscription.subscription
174-
for partition in self._metadata_snapshot[topic]
175-
])
175+
if self._subscription.partitions_auto_assigned():
176+
metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
177+
if self._metadata_snapshot != metadata_snapshot:
178+
self._metadata_snapshot = metadata_snapshot
179+
180+
# If we haven't got group coordinator support,
181+
# just assign all partitions locally
182+
if self._auto_assign_all_partitions():
183+
self._subscription.assign_from_subscribed([
184+
TopicPartition(topic, partition)
185+
for topic in self._subscription.subscription
186+
for partition in self._metadata_snapshot[topic]
187+
])
188+
189+
def _auto_assign_all_partitions(self):
190+
# For users that use "subscribe" without group support,
191+
# we will simply assign all partitions to this consumer
192+
if self.config['api_version'] < (0, 9):
193+
return True
194+
elif self.config['group_id'] is None:
195+
return True
196+
else:
197+
return False
176198

177199
def _build_metadata_snapshot(self, subscription, cluster):
178200
metadata_snapshot = {}
@@ -181,16 +203,6 @@ def _build_metadata_snapshot(self, subscription, cluster):
181203
metadata_snapshot[topic] = set(partitions)
182204
return metadata_snapshot
183205

184-
def _subscription_metadata_changed(self, cluster):
185-
if not self._subscription.partitions_auto_assigned():
186-
return False
187-
188-
metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
189-
if self._metadata_snapshot != metadata_snapshot:
190-
self._metadata_snapshot = metadata_snapshot
191-
return True
192-
return False
193-
194206
def _lookup_assignor(self, name):
195207
for assignor in self.config['assignors']:
196208
if assignor.name == name:
@@ -199,12 +211,10 @@ def _lookup_assignor(self, name):
199211

200212
def _on_join_complete(self, generation, member_id, protocol,
201213
member_assignment_bytes):
202-
# if we were the assignor, then we need to make sure that there have
203-
# been no metadata updates since the rebalance begin. Otherwise, we
204-
# won't rebalance again until the next metadata change
205-
if self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot:
206-
self._subscription.mark_for_reassignment()
207-
return
214+
# only the leader is responsible for monitoring for metadata changes
215+
# (i.e. partition changes)
216+
if not self._is_leader:
217+
self._assignment_snapshot = None
208218

209219
assignor = self._lookup_assignor(protocol)
210220
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
@@ -307,6 +317,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
307317
# keep track of the metadata used for assignment so that we can check
308318
# after rebalance completion whether anything has changed
309319
self._cluster.request_update()
320+
self._is_leader = True
310321
self._assignment_snapshot = self._metadata_snapshot
311322

312323
log.debug("Performing assignment for group %s using strategy %s"
@@ -338,18 +349,32 @@ def _on_join_prepare(self, generation, member_id):
338349
" for group %s failed on_partitions_revoked",
339350
self._subscription.listener, self.group_id)
340351

341-
self._assignment_snapshot = None
342-
self._subscription.mark_for_reassignment()
352+
self._is_leader = False
353+
self._subscription.reset_group_subscription()
343354

344355
def need_rejoin(self):
345356
"""Check whether the group should be rejoined
346357
347358
Returns:
348359
bool: True if consumer should rejoin group, False otherwise
349360
"""
350-
return (self._subscription.partitions_auto_assigned() and
351-
(super(ConsumerCoordinator, self).need_rejoin() or
352-
self._subscription.needs_partition_assignment))
361+
if not self._subscription.partitions_auto_assigned():
362+
return False
363+
364+
if self._auto_assign_all_partitions():
365+
return False
366+
367+
# we need to rejoin if we performed the assignment and metadata has changed
368+
if (self._assignment_snapshot is not None
369+
and self._assignment_snapshot != self._metadata_snapshot):
370+
return True
371+
372+
# we need to join if our subscription has changed since the last join
373+
if (self._joined_subscription is not None
374+
and self._joined_subscription != self._subscription.subscription):
375+
return True
376+
377+
return super(ConsumerCoordinator, self).need_rejoin()
353378

354379
def refresh_committed_offsets_if_needed(self):
355380
"""Fetch committed offsets for assigned partitions."""

0 commit comments

Comments
 (0)