Skip to content

Commit 2199bf8

Browse files
wbarnhaDenis KazakovKazakovDenis
authored andcommitted
KIP-345 Static membership implementation (#137)
* KIP-345 Add static consumer membership support * KIP-345 Add examples to docs * KIP-345 Add leave_group_on_close flag https://issues.apache.org/jira/browse/KAFKA-6995 * KIP-345 Add tests for static membership * KIP-345 Update docs for leave_group_on_close option * Update changelog.rst * remove six from base.py * Update base.py * Update base.py * Update base.py * Update changelog.rst * Update README.rst --------- Co-authored-by: Denis Kazakov <[email protected]> Co-authored-by: Denis Kazakov <[email protected]>
1 parent 3a3b0e3 commit 2199bf8

File tree

10 files changed

+302
-41
lines changed

10 files changed

+302
-41
lines changed

CHANGES.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
# 2.0.3 (under development)
2+
3+
Consumer
4+
* KIP-345: Implement static membership support
5+
16
# 2.0.2 (Sep 29, 2020)
27

38
Consumer

README.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,12 @@ that expose basic message attributes: topic, partition, offset, key, and value:
6464

6565
.. code-block:: python
6666
67+
# join a consumer group for dynamic partition assignment and offset commits
6768
from kafka import KafkaConsumer
68-
consumer = KafkaConsumer('my_favorite_topic')
69+
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
70+
# or as a static member with a fixed group member name
71+
# consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group',
72+
# group_instance_id='consumer-1', leave_group_on_close=False)
6973
for msg in consumer:
7074
print (msg)
7175

docs/changelog.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
Changelog
22
=========
33

4+
2.2.0
5+
####################
6+
7+
Consumer
8+
--------
9+
* KIP-345: Implement static membership support
10+
411

512
2.0.2 (Sep 29, 2020)
613
####################

docs/usage.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ KafkaConsumer
4747
group_id='my-group',
4848
bootstrap_servers='my.server.com')
4949
50+
# Use multiple static consumers w/ 2.3.0 kafka brokers
51+
consumer1 = KafkaConsumer('my-topic',
52+
group_id='my-group',
53+
group_instance_id='process-1',
54+
leave_group_on_close=False,
55+
bootstrap_servers='my.server.com')
56+
consumer2 = KafkaConsumer('my-topic',
57+
group_id='my-group',
58+
group_instance_id='process-2',
59+
leave_group_on_close=False,
60+
bootstrap_servers='my.server.com')
61+
5062
5163
There are many configuration options for the consumer class. See
5264
:class:`~kafka.KafkaConsumer` API documentation for more details.

kafka/consumer/group.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ class KafkaConsumer:
5252
committing offsets. If None, auto-partition assignment (via
5353
group coordinator) and offset commits are disabled.
5454
Default: None
55+
group_instance_id (str): the unique identifier to distinguish
56+
each client instance. If set and leave_group_on_close is
57+
False consumer group rebalancing won't be triggered until
58+
sessiont_timeout_ms is met. Requires 2.3.0+.
59+
leave_group_on_close (bool or None): whether to leave a consumer
60+
group or not on consumer shutdown.
5561
key_deserializer (callable): Any callable that takes a
5662
raw message key and returns a deserialized key.
5763
value_deserializer (callable): Any callable that takes a
@@ -241,6 +247,7 @@ class KafkaConsumer:
241247
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
242248
instance. (See kafka.oauth.abstract). Default: None
243249
kafka_client (callable): Custom class / callable for creating KafkaClient instances
250+
coordinator (callable): Custom class / callable for creating ConsumerCoordinator instances
244251
245252
Note:
246253
Configuration parameters are described in more detail at
@@ -250,6 +257,8 @@ class KafkaConsumer:
250257
'bootstrap_servers': 'localhost',
251258
'client_id': 'kafka-python-' + __version__,
252259
'group_id': None,
260+
'group_instance_id': '',
261+
'leave_group_on_close': None,
253262
'key_deserializer': None,
254263
'value_deserializer': None,
255264
'fetch_max_wait_ms': 500,
@@ -304,6 +313,7 @@ class KafkaConsumer:
304313
'sasl_oauth_token_provider': None,
305314
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
306315
'kafka_client': KafkaClient,
316+
'coordinator': ConsumerCoordinator,
307317
}
308318
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
309319

@@ -379,7 +389,7 @@ def __init__(self, *topics, **configs):
379389
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
380390
self._fetcher = Fetcher(
381391
self._client, self._subscription, self._metrics, **self.config)
382-
self._coordinator = ConsumerCoordinator(
392+
self._coordinator = self.config['coordinator'](
383393
self._client, self._subscription, self._metrics,
384394
assignors=self.config['partition_assignment_strategy'],
385395
**self.config)

kafka/coordinator/base.py

Lines changed: 111 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class BaseCoordinator:
7878

7979
DEFAULT_CONFIG = {
8080
'group_id': 'kafka-python-default-group',
81+
'group_instance_id': '',
82+
'leave_group_on_close': None,
8183
'session_timeout_ms': 10000,
8284
'heartbeat_interval_ms': 3000,
8385
'max_poll_interval_ms': 300000,
@@ -92,6 +94,12 @@ def __init__(self, client, metrics, **configs):
9294
group_id (str): name of the consumer group to join for dynamic
9395
partition assignment (if enabled), and to use for fetching and
9496
committing offsets. Default: 'kafka-python-default-group'
97+
group_instance_id (str): the unique identifier to distinguish
98+
each client instance. If set and leave_group_on_close is
99+
False consumer group rebalancing won't be triggered until
100+
sessiont_timeout_ms is met. Requires 2.3.0+.
101+
leave_group_on_close (bool or None): whether to leave a consumer
102+
group or not on consumer shutdown.
95103
session_timeout_ms (int): The timeout used to detect failures when
96104
using Kafka's group management facilities. Default: 30000
97105
heartbeat_interval_ms (int): The expected time in milliseconds
@@ -117,6 +125,11 @@ def __init__(self, client, metrics, **configs):
117125
"different values for max_poll_interval_ms "
118126
"and session_timeout_ms")
119127

128+
if self.config['group_instance_id'] and self.config['api_version'] < (2, 3, 0):
129+
raise Errors.KafkaConfigurationError(
130+
'Broker version %s does not support static membership' % (self.config['api_version'],),
131+
)
132+
120133
self._client = client
121134
self.group_id = self.config['group_id']
122135
self.heartbeat = Heartbeat(**self.config)
@@ -451,30 +464,48 @@ def _send_join_group_request(self):
451464
if self.config['api_version'] < (0, 9):
452465
raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers')
453466
elif (0, 9) <= self.config['api_version'] < (0, 10, 1):
454-
request = JoinGroupRequest[0](
467+
version = 0
468+
args = (
455469
self.group_id,
456470
self.config['session_timeout_ms'],
457471
self._generation.member_id,
458472
self.protocol_type(),
459-
member_metadata)
473+
member_metadata,
474+
)
460475
elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
461-
request = JoinGroupRequest[1](
476+
version = 1
477+
args = (
462478
self.group_id,
463479
self.config['session_timeout_ms'],
464480
self.config['max_poll_interval_ms'],
465481
self._generation.member_id,
466482
self.protocol_type(),
467-
member_metadata)
483+
member_metadata,
484+
)
485+
elif self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
486+
version = 5
487+
args = (
488+
self.group_id,
489+
self.config['session_timeout_ms'],
490+
self.config['max_poll_interval_ms'],
491+
self._generation.member_id,
492+
self.config['group_instance_id'],
493+
self.protocol_type(),
494+
member_metadata,
495+
)
468496
else:
469-
request = JoinGroupRequest[2](
497+
version = 2
498+
args = (
470499
self.group_id,
471500
self.config['session_timeout_ms'],
472501
self.config['max_poll_interval_ms'],
473502
self._generation.member_id,
474503
self.protocol_type(),
475-
member_metadata)
504+
member_metadata,
505+
)
476506

477507
# create the request for the coordinator
508+
request = JoinGroupRequest[version](*args)
478509
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
479510
future = Future()
480511
_f = self._client.send(self.coordinator_id, request)
@@ -558,12 +589,25 @@ def _handle_join_group_response(self, future, send_time, response):
558589

559590
def _on_join_follower(self):
560591
# send follower's sync group with an empty assignment
561-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
562-
request = SyncGroupRequest[version](
563-
self.group_id,
564-
self._generation.generation_id,
565-
self._generation.member_id,
566-
{})
592+
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
593+
version = 3
594+
args = (
595+
self.group_id,
596+
self._generation.generation_id,
597+
self._generation.member_id,
598+
self.config['group_instance_id'],
599+
{},
600+
)
601+
else:
602+
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
603+
args = (
604+
self.group_id,
605+
self._generation.generation_id,
606+
self._generation.member_id,
607+
{},
608+
)
609+
610+
request = SyncGroupRequest[version](*args)
567611
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
568612
self.group_id, self.coordinator_id, request)
569613
return self._send_sync_group_request(request)
@@ -586,15 +630,30 @@ def _on_join_leader(self, response):
586630
except Exception as e:
587631
return Future().failure(e)
588632

589-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
590-
request = SyncGroupRequest[version](
591-
self.group_id,
592-
self._generation.generation_id,
593-
self._generation.member_id,
594-
[(member_id,
595-
assignment if isinstance(assignment, bytes) else assignment.encode())
596-
for member_id, assignment in group_assignment.items()])
633+
group_assignment = [
634+
(member_id, assignment if isinstance(assignment, bytes) else assignment.encode())
635+
for member_id, assignment in group_assignment.items()
636+
]
637+
638+
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
639+
version = 3
640+
args = (
641+
self.group_id,
642+
self._generation.generation_id,
643+
self._generation.member_id,
644+
self.config['group_instance_id'],
645+
group_assignment,
646+
)
647+
else:
648+
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
649+
args = (
650+
self.group_id,
651+
self._generation.generation_id,
652+
self._generation.member_id,
653+
group_assignment,
654+
)
597655

656+
request = SyncGroupRequest[version](*args)
598657
log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
599658
self.group_id, self.coordinator_id, request)
600659
return self._send_sync_group_request(request)
@@ -760,15 +819,22 @@ def close(self):
760819
def maybe_leave_group(self):
761820
"""Leave the current group and reset local generation/memberId."""
762821
with self._client._lock, self._lock:
763-
if (not self.coordinator_unknown()
822+
if (
823+
not self.coordinator_unknown()
764824
and self.state is not MemberState.UNJOINED
765-
and self._generation is not Generation.NO_GENERATION):
766-
825+
and self._generation is not Generation.NO_GENERATION
826+
and self._leave_group_on_close()
827+
):
767828
# this is a minimal effort attempt to leave the group. we do not
768829
# attempt any resending if the request fails or times out.
769830
log.info('Leaving consumer group (%s).', self.group_id)
770-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
771-
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
831+
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
832+
version = 3
833+
args = (self.group_id, [(self._generation.member_id, self.config['group_instance_id'])])
834+
else:
835+
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
836+
args = self.group_id, self._generation.member_id
837+
request = LeaveGroupRequest[version](*args)
772838
future = self._client.send(self.coordinator_id, request)
773839
future.add_callback(self._handle_leave_group_response)
774840
future.add_errback(log.error, "LeaveGroup request failed: %s")
@@ -795,10 +861,23 @@ def _send_heartbeat_request(self):
795861
e = Errors.NodeNotReadyError(self.coordinator_id)
796862
return Future().failure(e)
797863

798-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
799-
request = HeartbeatRequest[version](self.group_id,
800-
self._generation.generation_id,
801-
self._generation.member_id)
864+
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
865+
version = 2
866+
args = (
867+
self.group_id,
868+
self._generation.generation_id,
869+
self._generation.member_id,
870+
self.config['group_instance_id'],
871+
)
872+
else:
873+
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
874+
args = (
875+
self.group_id,
876+
self._generation.generation_id,
877+
self._generation.member_id,
878+
)
879+
880+
request = HeartbeatRequest[version](*args)
802881
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
803882
future = Future()
804883
_f = self._client.send(self.coordinator_id, request)
@@ -845,6 +924,9 @@ def _handle_heartbeat_response(self, future, send_time, response):
845924
log.error("Heartbeat failed: Unhandled error: %s", error)
846925
future.failure(error)
847926

927+
def _leave_group_on_close(self):
928+
return self.config['leave_group_on_close'] is None or self.config['leave_group_on_close']
929+
848930

849931
class GroupCoordinatorMetrics:
850932
def __init__(self, heartbeat, metrics, prefix, tags=None):

kafka/coordinator/consumer.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class ConsumerCoordinator(BaseCoordinator):
2525
"""This class manages the coordination process with the consumer coordinator."""
2626
DEFAULT_CONFIG = {
2727
'group_id': 'kafka-python-default-group',
28+
'group_instance_id': '',
29+
'leave_group_on_close': None,
2830
'enable_auto_commit': True,
2931
'auto_commit_interval_ms': 5000,
3032
'default_offset_commit_callback': None,
@@ -45,6 +47,12 @@ def __init__(self, client, subscription, metrics, **configs):
4547
group_id (str): name of the consumer group to join for dynamic
4648
partition assignment (if enabled), and to use for fetching and
4749
committing offsets. Default: 'kafka-python-default-group'
50+
group_instance_id (str): the unique identifier to distinguish
51+
each client instance. If set and leave_group_on_close is
52+
False consumer group rebalancing won't be triggered until
53+
sessiont_timeout_ms is met. Requires 2.3.0+.
54+
leave_group_on_close (bool or None): whether to leave a consumer
55+
group or not on consumer shutdown.
4856
enable_auto_commit (bool): If true the consumer's offset will be
4957
periodically committed in the background. Default: True.
5058
auto_commit_interval_ms (int): milliseconds between automatic
@@ -304,10 +312,15 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
304312
assert assignor, f'Invalid assignment protocol: {assignment_strategy}'
305313
member_metadata = {}
306314
all_subscribed_topics = set()
307-
for member_id, metadata_bytes in members:
315+
316+
for member in members:
317+
if len(member) == 3:
318+
member_id, group_instance_id, metadata_bytes = member
319+
else:
320+
member_id, metadata_bytes = member
308321
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
309322
member_metadata[member_id] = metadata
310-
all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member
323+
all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member
311324

312325
# the leader will begin watching for changes to any of the topics
313326
# the group is interested in, which ensures that all metadata changes

0 commit comments

Comments
 (0)