Skip to content

Commit e6abbbf

Browse files
authored
Add synchronized decorator; add lock to subscription state (#2636)
1 parent 5957c1b commit e6abbbf

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

kafka/consumer/subscription_state.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
import logging
1616
import random
1717
import re
18+
import threading
1819
import time
1920

2021
from kafka.vendor import six
2122

2223
import kafka.errors as Errors
2324
from kafka.protocol.list_offsets import OffsetResetStrategy
2425
from kafka.structs import OffsetAndMetadata
25-
from kafka.util import ensure_valid_topic_name
26+
from kafka.util import ensure_valid_topic_name, synchronized
2627

2728
log = logging.getLogger(__name__)
2829

@@ -84,6 +85,7 @@ def __init__(self, offset_reset_strategy='earliest'):
8485
self.assignment = OrderedDict()
8586
self.rebalance_listener = None
8687
self.listeners = []
88+
self._lock = threading.RLock()
8789

8890
def _set_subscription_type(self, subscription_type):
8991
if not isinstance(subscription_type, SubscriptionType):
@@ -93,6 +95,7 @@ def _set_subscription_type(self, subscription_type):
9395
elif self.subscription_type != subscription_type:
9496
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
9597

98+
@synchronized
9699
def subscribe(self, topics=(), pattern=None, listener=None):
97100
"""Subscribe to a list of topics, or a topic regex pattern.
98101
@@ -147,6 +150,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
147150
raise TypeError('listener must be a ConsumerRebalanceListener')
148151
self.rebalance_listener = listener
149152

153+
@synchronized
150154
def change_subscription(self, topics):
151155
"""Change the topic subscription.
152156
@@ -178,6 +182,7 @@ def change_subscription(self, topics):
178182
self.subscription = set(topics)
179183
self._group_subscription.update(topics)
180184

185+
@synchronized
181186
def group_subscribe(self, topics):
182187
"""Add topics to the current group subscription.
183188
@@ -191,13 +196,15 @@ def group_subscribe(self, topics):
191196
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
192197
self._group_subscription.update(topics)
193198

199+
@synchronized
194200
def reset_group_subscription(self):
195201
"""Reset the group's subscription to only contain topics subscribed by this consumer."""
196202
if not self.partitions_auto_assigned():
197203
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
198204
assert self.subscription is not None, 'Subscription required'
199205
self._group_subscription.intersection_update(self.subscription)
200206

207+
@synchronized
201208
def assign_from_user(self, partitions):
202209
"""Manually assign a list of TopicPartitions to this consumer.
203210
@@ -222,6 +229,7 @@ def assign_from_user(self, partitions):
222229
self._set_assignment({partition: self.assignment.get(partition, TopicPartitionState())
223230
for partition in partitions})
224231

232+
@synchronized
225233
def assign_from_subscribed(self, assignments):
226234
"""Update the assignment to the specified partitions
227235
@@ -258,6 +266,7 @@ def _set_assignment(self, partition_states, randomize=False):
258266
for tp in topic_partitions[topic]:
259267
self.assignment[tp] = partition_states[tp]
260268

269+
@synchronized
261270
def unsubscribe(self):
262271
"""Clear all topic subscriptions and partition assignments"""
263272
self.subscription = None
@@ -266,6 +275,7 @@ def unsubscribe(self):
266275
self.subscribed_pattern = None
267276
self.subscription_type = SubscriptionType.NONE
268277

278+
@synchronized
269279
def group_subscription(self):
270280
"""Get the topic subscription for the group.
271281
@@ -281,6 +291,7 @@ def group_subscription(self):
281291
"""
282292
return self._group_subscription
283293

294+
@synchronized
284295
def seek(self, partition, offset):
285296
"""Manually specify the fetch offset for a TopicPartition.
286297
@@ -298,15 +309,18 @@ def seek(self, partition, offset):
298309
raise TypeError("offset must be type in or OffsetAndMetadata")
299310
self.assignment[partition].seek(offset)
300311

312+
@synchronized
301313
def assigned_partitions(self):
302314
"""Return set of TopicPartitions in current assignment."""
303315
return set(self.assignment.keys())
304316

317+
@synchronized
305318
def paused_partitions(self):
306319
"""Return current set of paused TopicPartitions."""
307320
return set(partition for partition in self.assignment
308321
if self.is_paused(partition))
309322

323+
@synchronized
310324
def fetchable_partitions(self):
311325
"""Return ordered list of TopicPartitions that should be Fetched."""
312326
fetchable = list()
@@ -315,10 +329,12 @@ def fetchable_partitions(self):
315329
fetchable.append(partition)
316330
return fetchable
317331

332+
@synchronized
318333
def partitions_auto_assigned(self):
319334
"""Return True unless user supplied partitions manually."""
320335
return self.subscription_type in (SubscriptionType.AUTO_TOPICS, SubscriptionType.AUTO_PATTERN)
321336

337+
@synchronized
322338
def all_consumed_offsets(self):
323339
"""Returns consumed offsets as {TopicPartition: OffsetAndMetadata}"""
324340
all_consumed = {}
@@ -327,6 +343,7 @@ def all_consumed_offsets(self):
327343
all_consumed[partition] = state.position
328344
return all_consumed
329345

346+
@synchronized
330347
def request_offset_reset(self, partition, offset_reset_strategy=None):
331348
"""Mark partition for offset reset using specified or default strategy.
332349
@@ -338,33 +355,40 @@ def request_offset_reset(self, partition, offset_reset_strategy=None):
338355
offset_reset_strategy = self._default_offset_reset_strategy
339356
self.assignment[partition].reset(offset_reset_strategy)
340357

358+
@synchronized
341359
def set_reset_pending(self, partitions, next_allowed_reset_time):
342360
for partition in partitions:
343361
self.assignment[partition].set_reset_pending(next_allowed_reset_time)
344362

363+
@synchronized
345364
def has_default_offset_reset_policy(self):
346365
"""Return True if default offset reset policy is Earliest or Latest"""
347366
return self._default_offset_reset_strategy != OffsetResetStrategy.NONE
348367

368+
@synchronized
349369
def is_offset_reset_needed(self, partition):
350370
return self.assignment[partition].awaiting_reset
351371

372+
@synchronized
352373
def has_all_fetch_positions(self):
353374
for state in six.itervalues(self.assignment):
354375
if not state.has_valid_position:
355376
return False
356377
return True
357378

379+
@synchronized
358380
def missing_fetch_positions(self):
359381
missing = set()
360382
for partition, state in six.iteritems(self.assignment):
361383
if state.is_missing_position():
362384
missing.add(partition)
363385
return missing
364386

387+
@synchronized
365388
def has_valid_position(self, partition):
366389
return partition in self.assignment and self.assignment[partition].has_valid_position
367390

391+
@synchronized
368392
def reset_missing_positions(self):
369393
partitions_with_no_offsets = set()
370394
for tp, state in six.iteritems(self.assignment):
@@ -377,32 +401,40 @@ def reset_missing_positions(self):
377401
if partitions_with_no_offsets:
378402
raise Errors.NoOffsetForPartitionError(partitions_with_no_offsets)
379403

404+
@synchronized
380405
def partitions_needing_reset(self):
381406
partitions = set()
382407
for tp, state in six.iteritems(self.assignment):
383408
if state.awaiting_reset and state.is_reset_allowed():
384409
partitions.add(tp)
385410
return partitions
386411

412+
@synchronized
387413
def is_assigned(self, partition):
388414
return partition in self.assignment
389415

416+
@synchronized
390417
def is_paused(self, partition):
391418
return partition in self.assignment and self.assignment[partition].paused
392419

420+
@synchronized
393421
def is_fetchable(self, partition):
394422
return partition in self.assignment and self.assignment[partition].is_fetchable()
395423

424+
@synchronized
396425
def pause(self, partition):
397426
self.assignment[partition].pause()
398427

428+
@synchronized
399429
def resume(self, partition):
400430
self.assignment[partition].resume()
401431

432+
@synchronized
402433
def reset_failed(self, partitions, next_retry_time):
403434
for partition in partitions:
404435
self.assignment[partition].reset_failed(next_retry_time)
405436

437+
@synchronized
406438
def move_partition_to_end(self, partition):
407439
if partition in self.assignment:
408440
try:
@@ -411,6 +443,7 @@ def move_partition_to_end(self, partition):
411443
state = self.assignment.pop(partition)
412444
self.assignment[partition] = state
413445

446+
@synchronized
414447
def position(self, partition):
415448
return self.assignment[partition].position
416449

kafka/util.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import, division
22

33
import binascii
4+
import functools
45
import re
56
import time
67
import weakref
@@ -129,3 +130,11 @@ class Dict(dict):
129130
See: https://docs.python.org/2/library/weakref.html
130131
"""
131132
pass
133+
134+
135+
def synchronized(func):
136+
def wrapper(self, *args, **kwargs):
137+
with self._lock:
138+
return func(self, *args, **kwargs)
139+
functools.update_wrapper(wrapper, func)
140+
return wrapper

0 commit comments

Comments
 (0)