@@ -112,3 +112,52 @@ KafkaProducer
112
112
113
113
# configure multiple retries
114
114
producer = KafkaProducer(retries = 5 )
115
+
116
+
117
+ ClusterMetadata
118
+ =============
119
+ .. code :: python
120
+
121
+ from kafka.cluster import ClusterMetadata
122
+
123
+ clusterMetadata = ClusterMetadata(bootstrap_servers = [' broker1:1234' ])
124
+
125
+ # get all brokers metadata
126
+ print (clusterMetadata.brokers())
127
+
128
+ # get specific broker metadata
129
+ print (clusterMetadata.broker_metadata(' bootstrap-0' ))
130
+
131
+ # get all partitions of a topic
132
+ print (clusterMetadata.partitions_for_topic(" topic" ))
133
+
134
+ # list topics
135
+ print (clusterMetadata.topics())
136
+
137
+
138
+ KafkaAdminClient
139
+ =============
140
+ .. code :: python
141
+ from kafka import KafkaAdminClient
142
+ from kafka.admin import NewTopic
143
+
144
+ admin = KafkaAdminClient(bootstrap_servers = [' broker1:1234' ])
145
+
146
+ # create a new topic
147
+ topics_list = []
148
+ topics_list.append(NewTopic(name = " testtopic" , num_partitions = 1 , replication_factor = 1 ))
149
+ admin.create_topics(topics_list,timeout_ms = None , validate_only = False )
150
+
151
+ # delete a topic
152
+ admin.delete_topics([' testtopic' ])
153
+
154
+ # list consumer groups
155
+ print (admin.list_consumer_groups())
156
+
157
+ # get consumer group details
158
+ print (admin.describe_consumer_groups(' cft-plt-qa.connect' ))
159
+
160
+ # get consumer group offset
161
+ print (admin.list_consumer_group_offsets(' cft-plt-qa.connect' ))
162
+
163
+
0 commit comments