@@ -418,6 +418,15 @@ def _send_request_to_controller(self, request):
418
418
419
419
@staticmethod
420
420
def _convert_new_topic_request (new_topic ):
421
+ """
422
+ Build the tuple required by CreateTopicsRequest from a NewTopic object.
423
+
424
+ :param new_topic: A NewTopic instance containing name, partition count, replication factor,
425
+ replica assignments, and config entries.
426
+ :return: A tuple in the form:
427
+ (topic_name, num_partitions, replication_factor, [(partition_id, [replicas])...],
428
+ [(config_key, config_value)...])
429
+ """
421
430
return (
422
431
new_topic .name ,
423
432
new_topic .num_partitions ,
@@ -515,23 +524,48 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
515
524
return future .value
516
525
517
526
def list_topics (self ):
527
+ """
528
+ Retrieve a list of all topic names in the cluster.
529
+
530
+ :return: A list of topic name strings.
531
+ """
518
532
metadata = self ._get_cluster_metadata (topics = None )
519
533
obj = metadata .to_object ()
520
534
return [t ['topic' ] for t in obj ['topics' ]]
521
535
522
536
def describe_topics (self , topics = None ):
537
+ """
538
+ Fetch metadata for the specified topics or all topics if None.
539
+
540
+ :param topics: (Optional) A list of topic names. If None, metadata for all
541
+ topics is retrieved.
542
+ :return: A list of dicts describing each topic (including partition info).
543
+ """
523
544
metadata = self ._get_cluster_metadata (topics = topics )
524
545
obj = metadata .to_object ()
525
546
return obj ['topics' ]
526
547
527
548
def describe_cluster (self ):
549
+ """
550
+ Fetch cluster-wide metadata such as the list of brokers, the controller ID,
551
+ and the cluster ID.
552
+
553
+ :return: A dict with cluster-wide metadata, excluding topic details.
554
+ """
528
555
metadata = self ._get_cluster_metadata ()
529
556
obj = metadata .to_object ()
530
557
obj .pop ('topics' ) # We have 'describe_topics' for this
531
558
return obj
532
559
533
560
@staticmethod
534
561
def _convert_describe_acls_response_to_acls (describe_response ):
562
+ """
563
+ Convert a DescribeAclsResponse into a list of ACL objects and a KafkaError.
564
+
565
+ :param describe_response: The response object from the DescribeAclsRequest.
566
+ :return: A tuple of (list_of_acl_objects, error) where error is an instance
567
+ of KafkaError (NoError if successful).
568
+ """
535
569
version = describe_response .API_VERSION
536
570
537
571
error = Errors .for_code (describe_response .error_code )
@@ -617,6 +651,12 @@ def describe_acls(self, acl_filter):
617
651
618
652
@staticmethod
619
653
def _convert_create_acls_resource_request_v0 (acl ):
654
+ """
655
+ Convert an ACL object into the CreateAclsRequest v0 format.
656
+
657
+ :param acl: An ACL object with resource pattern and permissions.
658
+ :return: A tuple: (resource_type, resource_name, principal, host, operation, permission_type).
659
+ """
620
660
621
661
return (
622
662
acl .resource_pattern .resource_type ,
@@ -629,7 +669,12 @@ def _convert_create_acls_resource_request_v0(acl):
629
669
630
670
@staticmethod
631
671
def _convert_create_acls_resource_request_v1 (acl ):
632
-
672
+ """
673
+ Convert an ACL object into the CreateAclsRequest v1 format.
674
+
675
+ :param acl: An ACL object with resource pattern and permissions.
676
+ :return: A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type).
677
+ """
633
678
return (
634
679
acl .resource_pattern .resource_type ,
635
680
acl .resource_pattern .resource_name ,
@@ -642,6 +687,17 @@ def _convert_create_acls_resource_request_v1(acl):
642
687
643
688
@staticmethod
644
689
def _convert_create_acls_response_to_acls (acls , create_response ):
690
+ """
691
+ Parse CreateAclsResponse and correlate success/failure with original ACL objects.
692
+
693
+ :param acls: A list of ACL objects that were requested for creation.
694
+ :param create_response: The broker's CreateAclsResponse object.
695
+ :return: A dict with:
696
+ {
697
+ 'succeeded': [list of ACL objects successfully created],
698
+ 'failed': [(acl_object, KafkaError), ...]
699
+ }
700
+ """
645
701
version = create_response .API_VERSION
646
702
647
703
creations_error = []
@@ -701,6 +757,12 @@ def create_acls(self, acls):
701
757
702
758
@staticmethod
703
759
def _convert_delete_acls_resource_request_v0 (acl ):
760
+ """
761
+ Convert an ACLFilter object into the DeleteAclsRequest v0 format.
762
+
763
+ :param acl: An ACLFilter object identifying the ACLs to be deleted.
764
+ :return: A tuple: (resource_type, resource_name, principal, host, operation, permission_type).
765
+ """
704
766
return (
705
767
acl .resource_pattern .resource_type ,
706
768
acl .resource_pattern .resource_name ,
@@ -712,6 +774,12 @@ def _convert_delete_acls_resource_request_v0(acl):
712
774
713
775
@staticmethod
714
776
def _convert_delete_acls_resource_request_v1 (acl ):
777
+ """
778
+ Convert an ACLFilter object into the DeleteAclsRequest v1 format.
779
+
780
+ :param acl: An ACLFilter object identifying the ACLs to be deleted.
781
+ :return: A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type).
782
+ """
715
783
return (
716
784
acl .resource_pattern .resource_type ,
717
785
acl .resource_pattern .resource_name ,
@@ -724,6 +792,14 @@ def _convert_delete_acls_resource_request_v1(acl):
724
792
725
793
@staticmethod
726
794
def _convert_delete_acls_response_to_matching_acls (acl_filters , delete_response ):
795
+ """
796
+ Parse the DeleteAclsResponse and map the results back to each input ACLFilter.
797
+
798
+ :param acl_filters: A list of ACLFilter objects that were provided in the request.
799
+ :param delete_response: The response from the DeleteAclsRequest.
800
+ :return: A list of tuples of the form:
801
+ (acl_filter, [(matching_acl, KafkaError), ...], filter_level_error).
802
+ """
727
803
version = delete_response .API_VERSION
728
804
filter_result_list = []
729
805
for i , filter_responses in enumerate (delete_response .filter_responses ):
@@ -795,6 +871,12 @@ def delete_acls(self, acl_filters):
795
871
796
872
@staticmethod
797
873
def _convert_describe_config_resource_request (config_resource ):
874
+ """
875
+ Convert a ConfigResource into the format required by DescribeConfigsRequest.
876
+
877
+ :param config_resource: A ConfigResource with resource_type, name, and optional config keys.
878
+ :return: A tuple: (resource_type, resource_name, [list_of_config_keys] or None).
879
+ """
798
880
return (
799
881
config_resource .resource_type ,
800
882
config_resource .name ,
@@ -881,6 +963,12 @@ def describe_configs(self, config_resources, include_synonyms=False):
881
963
882
964
@staticmethod
883
965
def _convert_alter_config_resource_request (config_resource ):
966
+ """
967
+ Convert a ConfigResource into the format required by AlterConfigsRequest.
968
+
969
+ :param config_resource: A ConfigResource with resource_type, name, and config (key, value) pairs.
970
+ :return: A tuple: (resource_type, resource_name, [(config_key, config_value), ...]).
971
+ """
884
972
return (
885
973
config_resource .resource_type ,
886
974
config_resource .name ,
@@ -930,6 +1018,13 @@ def alter_configs(self, config_resources):
930
1018
931
1019
@staticmethod
932
1020
def _convert_create_partitions_request (topic_name , new_partitions ):
1021
+ """
1022
+ Convert a NewPartitions object into the tuple format for CreatePartitionsRequest.
1023
+
1024
+ :param topic_name: The name of the existing topic.
1025
+ :param new_partitions: A NewPartitions instance with total_count and new_assignments.
1026
+ :return: A tuple: (topic_name, (total_count, [list_of_assignments])).
1027
+ """
933
1028
return (
934
1029
topic_name ,
935
1030
(
@@ -1311,6 +1406,12 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
1311
1406
return results
1312
1407
1313
1408
def _convert_delete_groups_response (self , response ):
1409
+ """
1410
+ Parse the DeleteGroupsResponse, mapping group IDs to their respective errors.
1411
+
1412
+ :param response: A DeleteGroupsResponse object from the broker.
1413
+ :return: A list of (group_id, KafkaError) for each deleted group.
1414
+ """
1314
1415
if response .API_VERSION <= 1 :
1315
1416
results = []
1316
1417
for group_id , error_code in response .results :
@@ -1322,12 +1423,12 @@ def _convert_delete_groups_response(self, response):
1322
1423
.format (response .API_VERSION ))
1323
1424
1324
1425
def _delete_consumer_groups_send_request (self , group_ids , group_coordinator_id ):
1325
- """Send a DeleteGroups request to a broker.
1326
-
1327
- :param group_ids: The consumer group ids of the groups which are to be deleted.
1328
- :param group_coordinator_id: The node_id of the broker which is the coordinator for
1329
- all the groups.
1330
- :return: A message future
1426
+ """
1427
+ Send a DeleteGroupsRequest to the specified broker (the group coordinator).
1428
+
1429
+ :param group_ids: A list of consumer group IDs to be deleted.
1430
+ :param group_coordinator_id: The node_id of the broker coordinating these groups.
1431
+ :return: A future representing the in-flight DeleteGroupsRequest.
1331
1432
"""
1332
1433
version = self ._matching_api_version (DeleteGroupsRequest )
1333
1434
if version <= 1 :
@@ -1339,6 +1440,12 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
1339
1440
return self ._send_request_to_node (group_coordinator_id , request )
1340
1441
1341
1442
def _wait_for_futures (self , futures ):
1443
+ """
1444
+ Block until all futures complete. If any fail, raise the encountered exception.
1445
+
1446
+ :param futures: A list of Future objects awaiting results.
1447
+ :raises: The first encountered exception if a future fails.
1448
+ """
1342
1449
while not all (future .succeeded () for future in futures ):
1343
1450
for future in futures :
1344
1451
self ._client .poll (future = future )
0 commit comments