Skip to content

Commit 46dfe3f

Browse files
Apurva007Apurva Telangjeffwidman
authored andcommitted
Enhancement for Kafka Admin Client's "Describe Consumer Group" (dpkp#2035)
Adding namedtuples for DescribeConsumerGroup response; Adding Serialization of MemberData and MemberAssignment for the response Co-authored-by: Apurva Telang <[email protected]> Co-authored-by: Jeff Widman <[email protected]>
1 parent 8420d76 commit 46dfe3f

File tree

3 files changed

+146
-16
lines changed

3 files changed

+146
-16
lines changed

kafka/admin/client.py

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
from __future__ import absolute_import
22

3-
from collections import defaultdict
3+
from collections import defaultdict, namedtuple
44
import copy
55
import logging
66
import socket
77

88
from . import ConfigResourceType
99
from kafka.vendor import six
1010

11+
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
12+
ACLResourcePatternType
1113
from kafka.client_async import KafkaClient, selectors
14+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
1215
import kafka.errors as Errors
1316
from kafka.errors import (
1417
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
@@ -19,9 +22,8 @@
1922
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
2023
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2124
from kafka.protocol.metadata import MetadataRequest
22-
from kafka.structs import TopicPartition, OffsetAndMetadata
23-
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
24-
ACLResourcePatternType
25+
from kafka.protocol.types import Array
26+
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
2527
from kafka.version import __version__
2628

2729

@@ -1000,22 +1002,47 @@ def _describe_consumer_groups_process_response(self, response):
10001002
"""Process a DescribeGroupsResponse into a group description."""
10011003
if response.API_VERSION <= 3:
10021004
assert len(response.groups) == 1
1003-
# TODO need to implement converting the response tuple into
1004-
# a more accessible interface like a namedtuple and then stop
1005-
# hardcoding tuple indices here. Several Java examples,
1006-
# including KafkaAdminClient.java
1007-
group_description = response.groups[0]
1008-
error_code = group_description[0]
1005+
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
1006+
if isinstance(response_field, Array):
1007+
described_groups = response.__dict__[response_name]
1008+
described_groups_field_schema = response_field.array_of
1009+
described_group = response.__dict__[response_name][0]
1010+
described_group_information_list = []
1011+
protocol_type_is_consumer = False
1012+
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
1013+
if group_information_name == 'protocol_type':
1014+
protocol_type = described_group_information
1015+
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
1016+
if isinstance(group_information_field, Array):
1017+
member_information_list = []
1018+
member_schema = group_information_field.array_of
1019+
for members in described_group_information:
1020+
member_information = []
1021+
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
1022+
if protocol_type_is_consumer:
1023+
if member_name == 'member_metadata' and member:
1024+
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
1025+
elif member_name == 'member_assignment' and member:
1026+
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
1027+
else:
1028+
member_information.append(member)
1029+
member_info_tuple = MemberInformation._make(member_information)
1030+
member_information_list.append(member_info_tuple)
1031+
described_group_information_list.append(member_information_list)
1032+
else:
1033+
described_group_information_list.append(described_group_information)
1034+
# Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail
1035+
# Therefore, appending a placeholder of None in it.
1036+
if response.API_VERSION <=2:
1037+
described_group_information_list.append(None)
1038+
group_description = GroupInformation._make(described_group_information_list)
1039+
error_code = group_description.error_code
10091040
error_type = Errors.for_code(error_code)
10101041
# Java has the note: KAFKA-6789, we can retry based on the error code
10111042
if error_type is not Errors.NoError:
10121043
raise error_type(
10131044
"DescribeGroupsResponse failed with response '{}'."
10141045
.format(response))
1015-
# TODO Java checks the group protocol type, and if consumer
1016-
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
1017-
# the members' partition assignments... that hasn't yet been
1018-
# implemented here so just return the raw struct results
10191046
else:
10201047
raise NotImplementedError(
10211048
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."

kafka/structs.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@
7070
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
7171
["offset", "timestamp"])
7272

73+
MemberInformation = namedtuple("MemberInformation",
74+
["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])
75+
76+
GroupInformation = namedtuple("GroupInformation",
77+
["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"])
7378

7479
"""Define retry policy for async producer
7580

test/test_admin_integration.py

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import pytest
22

3-
from test.testutil import env_kafka_version
3+
from logging import info
4+
from test.testutil import env_kafka_version, random_string
5+
from threading import Event, Thread
6+
from time import time, sleep
47

5-
from kafka.errors import NoError
68
from kafka.admin import (
79
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
10+
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
811

912

1013
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -138,3 +141,98 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
138141

139142
with pytest.raises(ValueError):
140143
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
144+
145+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
146+
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
147+
"""Tests that the describe consumer group call fails if the group coordinator is not available
148+
"""
149+
with pytest.raises(GroupCoordinatorNotAvailableError):
150+
group_description = kafka_admin_client.describe_consumer_groups(['test'])
151+
152+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
153+
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
154+
"""Tests that the describe consumer group call returns valid consumer group information
155+
This test takes inspiration from the test 'test_group' in test_consumer_group.py.
156+
"""
157+
consumers = {}
158+
stop = {}
159+
threads = {}
160+
random_group_id = 'test-group-' + random_string(6)
161+
group_id_list = [random_group_id, random_group_id + '_2']
162+
generations = {group_id_list[0]: set(), group_id_list[1]: set()}
163+
def consumer_thread(i, group_id):
164+
assert i not in consumers
165+
assert i not in stop
166+
stop[i] = Event()
167+
consumers[i] = kafka_consumer_factory(group_id=group_id)
168+
while not stop[i].is_set():
169+
consumers[i].poll(20)
170+
consumers[i].close()
171+
consumers[i] = None
172+
stop[i] = None
173+
174+
num_consumers = 3
175+
for i in range(num_consumers):
176+
group_id = group_id_list[i % 2]
177+
t = Thread(target=consumer_thread, args=(i, group_id,))
178+
t.start()
179+
threads[i] = t
180+
181+
try:
182+
timeout = time() + 35
183+
while True:
184+
for c in range(num_consumers):
185+
186+
# Verify all consumers have been created
187+
if c not in consumers:
188+
break
189+
190+
# Verify all consumers have an assignment
191+
elif not consumers[c].assignment():
192+
break
193+
194+
# If all consumers exist and have an assignment
195+
else:
196+
197+
info('All consumers have assignment... checking for stable group')
198+
# Verify all consumers are in the same generation
199+
# then log state and break while loop
200+
201+
for consumer in consumers.values():
202+
generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id)
203+
204+
is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()])
205+
206+
# New generation assignment is not complete until
207+
# coordinator.rejoining = False
208+
rejoining = any([consumer._coordinator.rejoining
209+
for consumer in list(consumers.values())])
210+
211+
if not rejoining and is_same_generation:
212+
break
213+
else:
214+
sleep(1)
215+
assert time() < timeout, "timeout waiting for assignments"
216+
217+
info('Group stabilized; verifying assignment')
218+
output = kafka_admin_client.describe_consumer_groups(group_id_list)
219+
assert len(output) == 2
220+
consumer_groups = set()
221+
for consumer_group in output:
222+
assert(consumer_group.group in group_id_list)
223+
if consumer_group.group == group_id_list[0]:
224+
assert(len(consumer_group.members) == 2)
225+
else:
226+
assert(len(consumer_group.members) == 1)
227+
for member in consumer_group.members:
228+
assert(member.member_metadata.subscription[0] == topic)
229+
assert(member.member_assignment.assignment[0][0] == topic)
230+
consumer_groups.add(consumer_group.group)
231+
assert(sorted(list(consumer_groups)) == group_id_list)
232+
finally:
233+
info('Shutting down %s consumers', num_consumers)
234+
for c in range(num_consumers):
235+
info('Stopping consumer %s', c)
236+
stop[c].set()
237+
threads[c].join()
238+
threads[c] = None

0 commit comments

Comments
 (0)