@@ -142,13 +142,15 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
142
142
with pytest .raises (ValueError ):
143
143
configs = kafka_admin_client .describe_configs ([ConfigResource (ConfigResourceType .BROKER , broker_id )])
144
144
145
+
145
146
@pytest .mark .skipif (env_kafka_version () < (0 , 11 ), reason = 'Describe consumer group requires broker >=0.11' )
146
147
def test_describe_consumer_group_does_not_exist (kafka_admin_client ):
147
148
"""Tests that the describe consumer group call fails if the group coordinator is not available
148
149
"""
149
150
with pytest .raises (GroupCoordinatorNotAvailableError ):
150
151
group_description = kafka_admin_client .describe_consumer_groups (['test' ])
151
152
153
+
152
154
@pytest .mark .skipif (env_kafka_version () < (0 , 11 ), reason = 'Describe consumer group requires broker >=0.11' )
153
155
def test_describe_consumer_group_exists (kafka_admin_client , kafka_consumer_factory , topic ):
154
156
"""Tests that the describe consumer group call returns valid consumer group information
@@ -236,3 +238,62 @@ def consumer_thread(i, group_id):
236
238
stop [c ].set ()
237
239
threads [c ].join ()
238
240
threads [c ] = None
241
+
242
+
243
+ @pytest .mark .skipif (env_kafka_version () < (1 , 1 ), reason = "Delete consumer groups requires broker >=1.1" )
244
+ def test_delete_consumergroups_inactive_group (kafka_admin_client , kafka_consumer_factory , send_messages ):
245
+ send_messages (range (0 , 100 ), partition = 0 )
246
+ send_messages (range (0 , 100 ), partition = 1 )
247
+ consumer1 = kafka_consumer_factory (group_id = "group1" )
248
+ next (consumer1 )
249
+ consumer1 .close ()
250
+
251
+ consumer2 = kafka_consumer_factory (group_id = "group2" )
252
+ next (consumer2 )
253
+ consumer2 .close ()
254
+
255
+ consumer3 = kafka_consumer_factory (group_id = "group3" )
256
+ next (consumer3 )
257
+ consumer3 .close ()
258
+
259
+ consumergroups = {group_id for group_id , _ in kafka_admin_client .list_consumer_groups ()}
260
+ assert "group1" in consumergroups
261
+ assert "group2" in consumergroups
262
+ assert "group3" in consumergroups
263
+
264
+ kafka_admin_client .delete_consumer_groups (["group1" , "group2" ])
265
+
266
+ consumergroups = {group_id for group_id , _ in kafka_admin_client .list_consumer_groups ()}
267
+ assert "group1" not in consumergroups
268
+ assert "group2" not in consumergroups
269
+ assert "group3" in consumergroups
270
+
271
+
272
+ @pytest .mark .skipif (env_kafka_version () < (1 , 1 ), reason = "Delete consumer groups requires broker >=1.1" )
273
+ def test_delete_consumergroups_active_group (kafka_admin_client , kafka_consumer_factory , send_messages ):
274
+ send_messages (range (0 , 100 ), partition = 0 )
275
+ send_messages (range (0 , 100 ), partition = 1 )
276
+ consumer1 = kafka_consumer_factory (group_id = "group1" )
277
+ next (consumer1 )
278
+ consumer1 .close ()
279
+
280
+ consumer2 = kafka_consumer_factory (group_id = "group2" )
281
+ next (consumer2 )
282
+
283
+ consumer3 = kafka_consumer_factory (group_id = "group3" )
284
+ next (consumer3 )
285
+ consumer3 .close ()
286
+
287
+ consumergroups = {group_id for group_id , _ in kafka_admin_client .list_consumer_groups ()}
288
+ assert "group1" in consumergroups
289
+ assert "group2" in consumergroups
290
+ assert "group3" in consumergroups
291
+
292
+ # TODO use more specific exception
293
+ with pytest .raises (Exception ):
294
+ kafka_admin_client .delete_consumer_groups (["group1" , "group2" ])
295
+
296
+ consumergroups = {group_id for group_id , _ in kafka_admin_client .list_consumer_groups ()}
297
+ assert "group1" not in consumergroups
298
+ assert "group2" in consumergroups
299
+ assert "group3" in consumergroups
0 commit comments