7
7
8
8
from kafka .admin import (
9
9
ACLFilter , ACLOperation , ACLPermissionType , ResourcePattern , ResourceType , ACL , ConfigResource , ConfigResourceType )
10
- from kafka .errors import (NoError , GroupCoordinatorNotAvailableError )
10
+ from kafka .errors import (NoError , GroupCoordinatorNotAvailableError , NonEmptyGroupError , GroupIdNotFoundError )
11
11
12
12
13
13
@pytest .mark .skipif (env_kafka_version () < (0 , 11 ), reason = "ACL features require broker >=0.11" )
@@ -241,9 +241,8 @@ def consumer_thread(i, group_id):
241
241
242
242
243
243
@pytest .mark .skipif (env_kafka_version () < (1 , 1 ), reason = "Delete consumer groups requires broker >=1.1" )
244
- def test_delete_consumergroups_inactive_group (kafka_admin_client , kafka_consumer_factory , send_messages ):
244
+ def test_delete_consumergroups (kafka_admin_client , kafka_consumer_factory , send_messages ):
245
245
send_messages (range (0 , 100 ), partition = 0 )
246
- send_messages (range (0 , 100 ), partition = 1 )
247
246
consumer1 = kafka_consumer_factory (group_id = "group1" )
248
247
next (consumer1 )
249
248
consumer1 .close ()
@@ -261,7 +260,13 @@ def test_delete_consumergroups_inactive_group(kafka_admin_client, kafka_consumer
261
260
assert "group2" in consumergroups
262
261
assert "group3" in consumergroups
263
262
264
- kafka_admin_client .delete_consumer_groups (["group1" , "group2" ])
263
+ delete_results = {
264
+ group_id : error
265
+ for group_id , error in kafka_admin_client .delete_consumer_groups (["group1" , "group2" ])
266
+ }
267
+ assert delete_results ["group1" ] == NoError
268
+ assert delete_results ["group2" ] == NoError
269
+ assert "group3" not in delete_results
265
270
266
271
consumergroups = {group_id for group_id , _ in kafka_admin_client .list_consumer_groups ()}
267
272
assert "group1" not in consumergroups
@@ -270,30 +275,30 @@ def test_delete_consumergroups_inactive_group(kafka_admin_client, kafka_consumer
270
275
271
276
272
277
@pytest .mark .skipif (env_kafka_version () < (1 , 1 ), reason = "Delete consumer groups requires broker >=1.1" )
273
- def test_delete_consumergroups_active_group (kafka_admin_client , kafka_consumer_factory , send_messages ):
278
+ def test_delete_consumergroups_with_errors (kafka_admin_client , kafka_consumer_factory , send_messages ):
274
279
send_messages (range (0 , 100 ), partition = 0 )
275
- send_messages (range (0 , 100 ), partition = 1 )
276
280
consumer1 = kafka_consumer_factory (group_id = "group1" )
277
281
next (consumer1 )
278
282
consumer1 .close ()
279
283
280
284
consumer2 = kafka_consumer_factory (group_id = "group2" )
281
285
next (consumer2 )
282
286
283
- consumer3 = kafka_consumer_factory (group_id = "group3" )
284
- next (consumer3 )
285
- consumer3 .close ()
286
-
287
287
consumergroups = {group_id for group_id , _ in kafka_admin_client .list_consumer_groups ()}
288
288
assert "group1" in consumergroups
289
289
assert "group2" in consumergroups
290
- assert "group3" in consumergroups
290
+ assert "group3" not in consumergroups
291
+
292
+ delete_results = {
293
+ group_id : error
294
+ for group_id , error in kafka_admin_client .delete_consumer_groups (["group1" , "group2" , "group3" ])
295
+ }
291
296
292
- # TODO use more specific exception
293
- with pytest . raises ( Exception ):
294
- kafka_admin_client . delete_consumer_groups ([ "group1" , "group2" ])
297
+ assert delete_results [ "group1" ] == NoError
298
+ assert delete_results [ "group2" ] == NonEmptyGroupError
299
+ assert delete_results [ "group3" ] == GroupIdNotFoundError
295
300
296
301
consumergroups = {group_id for group_id , _ in kafka_admin_client .list_consumer_groups ()}
297
302
assert "group1" not in consumergroups
298
303
assert "group2" in consumergroups
299
- assert "group3" in consumergroups
304
+ assert "group3" not in consumergroups
0 commit comments