Skip to content

Commit 968059a

Browse files
author
Apurva Telang
committed
PR review changes: Sorted Imports in alpha order, made group description response processing more readable, enhanced tests
1 parent 27fad5f commit 968059a

File tree

2 files changed

+120
-38
lines changed

2 files changed

+120
-38
lines changed

kafka/admin/client.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import socket
77

88
from . import ConfigResourceType
9-
from kafka.vendor import six
109

10+
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
11+
ACLResourcePatternType
1112
from kafka.client_async import KafkaClient, selectors
13+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
1214
import kafka.errors as Errors
1315
from kafka.errors import (
1416
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
@@ -17,13 +19,11 @@
1719
from kafka.protocol.admin import (
1820
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
1921
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
20-
from kafka.protocol.types import Array
21-
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
2222
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2323
from kafka.protocol.metadata import MetadataRequest
24+
from kafka.protocol.types import Array
2425
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
25-
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
26-
ACLResourcePatternType
26+
from kafka.vendor import six
2727
from kafka.version import __version__
2828

2929

@@ -1003,39 +1003,39 @@ def _describe_consumer_groups_process_response(self, response):
10031003
if response.API_VERSION <= 3:
10041004
assert len(response.groups) == 1
10051005
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
1006-
if type(response_field) == Array:
1006+
if isinstance(response_field, Array):
10071007
described_groups = response.__dict__[response_name]
10081008
described_groups_field_schema = response_field.array_of
1009-
for described_group in described_groups:
1010-
described_group_information_list = []
1011-
is_consumer_protocol_type = False
1012-
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
1013-
if group_information_name == 'protocol_type':
1014-
protocol_type = described_group_information
1015-
is_consumer_protocol_type = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
1016-
if type(group_information_field) == Array:
1017-
member_information_list = []
1018-
member_schema = group_information_field.array_of
1019-
for members in described_group_information:
1020-
member_information = []
1021-
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
1022-
if member_name == 'member_metadata' and is_consumer_protocol_type:
1009+
described_group = response.__dict__[response_name][0]
1010+
described_group_information_list = []
1011+
protocol_type_is_consumer = False
1012+
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
1013+
if group_information_name == 'protocol_type':
1014+
protocol_type = described_group_information
1015+
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
1016+
if type(group_information_field) == Array:
1017+
member_information_list = []
1018+
member_schema = group_information_field.array_of
1019+
for members in described_group_information:
1020+
member_information = []
1021+
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
1022+
if protocol_type_is_consumer:
1023+
if member_name == 'member_metadata' and member:
10231024
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
1024-
elif member_name == 'member_assignment' and is_consumer_protocol_type:
1025+
elif member_name == 'member_assignment' and member:
10251026
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
10261027
else:
10271028
member_information.append(member)
1028-
else:
1029-
member_info_tuple = MemberInformation._make(member_information)
1030-
member_information_list.append(member_info_tuple)
1031-
else:
1032-
described_group_information_list.append(member_information_list)
1033-
else:
1034-
described_group_information_list.append(described_group_information)
1029+
member_info_tuple = MemberInformation._make(member_information)
1030+
member_information_list.append(member_info_tuple)
1031+
described_group_information_list.append(member_information_list)
10351032
else:
1036-
if response.API_VERSION <=2:
1037-
described_group_information_list.append([])
1038-
group_description = GroupInformation._make(described_group_information_list)
1033+
described_group_information_list.append(described_group_information)
1034+
# Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail
1035+
# Therefore, appending a placeholder of None in it.
1036+
if response.API_VERSION <=2:
1037+
described_group_information_list.append(None)
1038+
group_description = GroupInformation._make(described_group_information_list)
10391039
error_code = group_description.error_code
10401040
error_type = Errors.for_code(error_code)
10411041
# Java has the note: KAFKA-6789, we can retry based on the error code

test/test_admin_integration.py

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import pytest
22

3-
from test.testutil import env_kafka_version
3+
from logging import info
4+
from test.testutil import env_kafka_version, random_string
5+
from threading import Event, Thread
6+
from time import time, sleep
47

5-
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
68
from kafka.admin import (
79
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
10+
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
811

912

1013
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -149,9 +152,88 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client):
149152
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
150153
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
151154
"""Tests that the describe consumer group call returns valid consumer group information
155+
This test takes inspiration from the test 'test_group' in test_consumer_group.py.
152156
"""
153-
consumer = kafka_consumer_factory(group_id='testgrp', auto_offset_reset='earliest')
154-
consumer.poll(timeout_ms=20)
155-
output = kafka_admin_client.describe_consumer_groups(['testgrp'])
156-
assert output[0].group == 'testgrp'
157-
assert output[0].members[0].member_metadata.subscription[0] == topic
157+
consumers = {}
158+
stop = {}
159+
threads = {}
160+
random_group_id = 'test-group-' + random_string(6)
161+
group_id_list = [random_group_id, random_group_id + '_2']
162+
generations = {group_id_list[0]: set(), group_id_list[1]: set()}
163+
def consumer_thread(i, group_id):
164+
assert i not in consumers
165+
assert i not in stop
166+
stop[i] = Event()
167+
consumers[i] = kafka_consumer_factory(group_id=group_id)
168+
while not stop[i].is_set():
169+
consumers[i].poll(20)
170+
consumers[i].close()
171+
consumers[i] = None
172+
stop[i] = None
173+
174+
num_consumers = 3
175+
for i in range(num_consumers):
176+
group_id = group_id_list[i % 2]
177+
t = Thread(target=consumer_thread, args=(i, group_id,))
178+
t.start()
179+
threads[i] = t
180+
181+
try:
182+
timeout = time() + 35
183+
while True:
184+
for c in range(num_consumers):
185+
186+
# Verify all consumers have been created
187+
if c not in consumers:
188+
break
189+
190+
# Verify all consumers have an assignment
191+
elif not consumers[c].assignment():
192+
break
193+
194+
# If all consumers exist and have an assignment
195+
else:
196+
197+
info('All consumers have assignment... checking for stable group')
198+
# Verify all consumers are in the same generation
199+
# then log state and break while loop
200+
201+
for consumer in consumers.values():
202+
generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id)
203+
204+
is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()])
205+
206+
# New generation assignment is not complete until
207+
# coordinator.rejoining = False
208+
rejoining = any([consumer._coordinator.rejoining
209+
for consumer in list(consumers.values())])
210+
211+
if not rejoining and is_same_generation:
212+
break
213+
else:
214+
sleep(1)
215+
assert time() < timeout, "timeout waiting for assignments"
216+
217+
info('Group stabilized; verifying assignment')
218+
output = kafka_admin_client.describe_consumer_groups(group_id_list)
219+
assert len(output) == 2
220+
consumer_groups = set()
221+
for consumer_group in output:
222+
assert(consumer_group.group in group_id_list)
223+
if consumer_group.group == group_id_list[0]:
224+
assert(len(consumer_group.members) == 2)
225+
else:
226+
assert(len(consumer_group.members) == 1)
227+
for member in consumer_group.members:
228+
assert(member.member_metadata.subscription[0] == topic)
229+
assert(member.member_assignment.assignment[0][0] == topic)
230+
assert consumer_group.members
231+
consumer_groups.add(consumer_group.group)
232+
assert(sorted(list(consumer_groups)) == group_id_list)
233+
finally:
234+
info('Shutting down %s consumers', num_consumers)
235+
for c in range(num_consumers):
236+
info('Stopping consumer %s', c)
237+
stop[c].set()
238+
threads[c].join()
239+
threads[c] = None

0 commit comments

Comments
 (0)