Skip to content

Commit 27fad5f

Browse files
author
Apurva Telang
committed
Adding namedtuples for DescribeConsumerGroup response; Adding Serialization of MemberData and MemberAssignment for the response
1 parent f9e0264 commit 27fad5f

File tree

3 files changed

+62
-13
lines changed

3 files changed

+62
-13
lines changed

kafka/admin/client.py

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import absolute_import
22

3-
from collections import defaultdict
3+
from collections import defaultdict, namedtuple
44
import copy
55
import logging
66
import socket
@@ -17,9 +17,11 @@
1717
from kafka.protocol.admin import (
1818
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
1919
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
20+
from kafka.protocol.types import Array
21+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
2022
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2123
from kafka.protocol.metadata import MetadataRequest
22-
from kafka.structs import TopicPartition, OffsetAndMetadata
24+
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
2325
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
2426
ACLResourcePatternType
2527
from kafka.version import __version__
@@ -1000,22 +1002,47 @@ def _describe_consumer_groups_process_response(self, response):
10001002
"""Process a DescribeGroupsResponse into a group description."""
10011003
if response.API_VERSION <= 3:
10021004
assert len(response.groups) == 1
1003-
# TODO need to implement converting the response tuple into
1004-
# a more accessible interface like a namedtuple and then stop
1005-
# hardcoding tuple indices here. Several Java examples,
1006-
# including KafkaAdminClient.java
1007-
group_description = response.groups[0]
1008-
error_code = group_description[0]
1005+
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
1006+
if type(response_field) == Array:
1007+
described_groups = response.__dict__[response_name]
1008+
described_groups_field_schema = response_field.array_of
1009+
for described_group in described_groups:
1010+
described_group_information_list = []
1011+
is_consumer_protocol_type = False
1012+
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
1013+
if group_information_name == 'protocol_type':
1014+
protocol_type = described_group_information
1015+
is_consumer_protocol_type = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
1016+
if type(group_information_field) == Array:
1017+
member_information_list = []
1018+
member_schema = group_information_field.array_of
1019+
for members in described_group_information:
1020+
member_information = []
1021+
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
1022+
if member_name == 'member_metadata' and is_consumer_protocol_type:
1023+
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
1024+
elif member_name == 'member_assignment' and is_consumer_protocol_type:
1025+
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
1026+
else:
1027+
member_information.append(member)
1028+
else:
1029+
member_info_tuple = MemberInformation._make(member_information)
1030+
member_information_list.append(member_info_tuple)
1031+
else:
1032+
described_group_information_list.append(member_information_list)
1033+
else:
1034+
described_group_information_list.append(described_group_information)
1035+
else:
1036+
if response.API_VERSION <=2:
1037+
described_group_information_list.append([])
1038+
group_description = GroupInformation._make(described_group_information_list)
1039+
error_code = group_description.error_code
10091040
error_type = Errors.for_code(error_code)
10101041
# Java has the note: KAFKA-6789, we can retry based on the error code
10111042
if error_type is not Errors.NoError:
10121043
raise error_type(
10131044
"DescribeGroupsResponse failed with response '{}'."
10141045
.format(response))
1015-
# TODO Java checks the group protocol type, and if consumer
1016-
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
1017-
# the members' partition assignments... that hasn't yet been
1018-
# implemented here so just return the raw struct results
10191046
else:
10201047
raise NotImplementedError(
10211048
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."

kafka/structs.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
2121
["offset", "timestamp"])
2222

23+
MemberInformation = namedtuple("MemberInformation",
24+
["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])
25+
26+
GroupInformation = namedtuple("GroupInformation",
27+
["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"])
2328

2429
# Define retry policy for async producer
2530
# Limit value: int >= 0, 0 means no retries

test/test_admin_integration.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from test.testutil import env_kafka_version
44

5-
from kafka.errors import NoError
5+
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
66
from kafka.admin import (
77
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
88

@@ -138,3 +138,20 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
138138

139139
with pytest.raises(ValueError):
140140
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
141+
142+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
143+
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
144+
"""Tests that the describe consumer group call fails if the group coordinator is not available
145+
"""
146+
with pytest.raises(GroupCoordinatorNotAvailableError):
147+
group_description = kafka_admin_client.describe_consumer_groups(['test'])
148+
149+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
150+
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
151+
"""Tests that the describe consumer group call returns valid consumer group information
152+
"""
153+
consumer = kafka_consumer_factory(group_id='testgrp', auto_offset_reset='earliest')
154+
consumer.poll(timeout_ms=20)
155+
output = kafka_admin_client.describe_consumer_groups(['testgrp'])
156+
assert output[0].group == 'testgrp'
157+
assert output[0].members[0].member_metadata.subscription[0] == topic

0 commit comments

Comments
 (0)