Skip to content

Commit 16cd36a

Browse files
author
Swen Wenzel
committed
implemente delete_groups feature on KafkaAdminClient
1 parent 804faf4 commit 16cd36a

File tree

1 file changed

+90
-3
lines changed

1 file changed

+90
-3
lines changed

kafka/admin/client.py

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
from kafka.metrics import MetricConfig, Metrics
2020
from kafka.protocol.admin import (
2121
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
22-
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
22+
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
23+
DeleteGroupsRequest
24+
)
2325
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2426
from kafka.protocol.metadata import MetadataRequest
2527
from kafka.protocol.types import Array
@@ -343,6 +345,30 @@ def _find_coordinator_id(self, group_id):
343345
response = future.value
344346
return self._find_coordinator_id_process_response(response)
345347

348+
def _find_many_coordinator_ids(self, group_ids):
349+
"""Find the broker node_id of the coordinator for each of the given groups.
350+
351+
Sends a FindCoordinatorRequest message to the cluster for each group_id.
352+
Will block until the FindCoordinatorResponse is received for all groups.
353+
Any errors are immediately raised.
354+
355+
:param group_ids: A list of consumer group IDs. This is typically the group
356+
name as a string.
357+
:return: A list of tuples (group_id, node_id) where node_id is the id
358+
of the broker that is the coordinator for the corresponding group.
359+
"""
360+
# Note: Java may change how this is implemented in KAFKA-6791.
361+
futures = {
362+
group_id: self._find_coordinator_id_send_request(group_id)
363+
for group_id in group_ids
364+
}
365+
self._wait_for_futures(list(futures.values()))
366+
groups_coordinators = [
367+
(group_id, self._find_coordinator_id_process_response(f.value))
368+
for group_id, f in futures.items()
369+
]
370+
return groups_coordinators
371+
346372
def _send_request_to_node(self, node_id, request):
347373
"""Send a Kafka protocol message to a specific broker.
348374
@@ -1261,8 +1287,69 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
12611287
response = future.value
12621288
return self._list_consumer_group_offsets_process_response(response)
12631289

1264-
# delete groups protocol not yet implemented
1265-
# Note: send the request to the group's coordinator.
1290+
def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
1291+
"""Delete Consumer Group Offsets for given consumer groups.
1292+
1293+
Note:
1294+
This does not verify that the group ids actually exist and
1295+
group_coordinator_id is the correct coordinator for all these groups.
1296+
1297+
The result needs checking for potential errors.
1298+
1299+
:param group_ids: The consumer group ids of the groups which are to be deleted.
1300+
:param group_coordinator_id: The node_id of the broker which is the coordinator for
1301+
all the groups. Use only if all groups are coordinated by the same broker.
1302+
If set to None, will query the cluster to find the coordinator for every single group.
1303+
Explicitly specifying this can be useful to prevent
1304+
that extra network round trips if you already know the group
1305+
coordinator. Default: None.
1306+
:return dictionary: A list of tuples (group_id, KafkaError)
1307+
"""
1308+
if group_coordinator_id is not None:
1309+
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
1310+
else:
1311+
groups_coordinators = defaultdict(list)
1312+
for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
1313+
groups_coordinators[group_coordinator_id].append(group_id)
1314+
futures = [
1315+
self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
1316+
for group_coordinator_id, group_ids in groups_coordinators.items()
1317+
]
1318+
1319+
self._wait_for_futures(futures)
1320+
1321+
results = []
1322+
for f in futures:
1323+
results.extend(self._convert_delete_groups_response(f.value))
1324+
return results
1325+
1326+
def _convert_delete_groups_response(self, response):
1327+
if response.API_VERSION <= 1:
1328+
results = []
1329+
for group_id, error_code in response.results:
1330+
results.append((group_id, Errors.for_code(error_code)))
1331+
return results
1332+
else:
1333+
raise NotImplementedError(
1334+
"Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient."
1335+
.format(response.API_VERSION))
1336+
1337+
def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
1338+
"""Send a DeleteGroups request to a broker.
1339+
1340+
:param group_ids: The consumer group ids of the groups which are to be deleted.
1341+
:param group_coordinator_id: The node_id of the broker which is the coordinator for
1342+
all the groups.
1343+
:return: A message future
1344+
"""
1345+
version = self._matching_api_version(DeleteGroupsRequest)
1346+
if version <= 1:
1347+
request = DeleteGroupsRequest[version](group_ids)
1348+
else:
1349+
raise NotImplementedError(
1350+
"Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient."
1351+
.format(version))
1352+
return self._send_request_to_node(group_coordinator_id, request)
12661353

12671354
def _wait_for_futures(self, futures):
12681355
while not all(future.succeeded() for future in futures):

0 commit comments

Comments
 (0)