File tree Expand file tree Collapse file tree 1 file changed +6
-3
lines changed Expand file tree Collapse file tree 1 file changed +6
-3
lines changed Original file line number Diff line number Diff line change @@ -168,7 +168,7 @@ def consumer_thread(i, group_id):
168
168
stop [i ] = Event ()
169
169
consumers [i ] = kafka_consumer_factory (group_id = group_id )
170
170
while not stop [i ].is_set ():
171
- consumers [i ].poll (20 )
171
+ consumers [i ].poll (timeout_ms = 200 )
172
172
consumers [i ].close ()
173
173
consumers [i ] = None
174
174
stop [i ] = None
@@ -183,6 +183,7 @@ def consumer_thread(i, group_id):
183
183
try :
184
184
timeout = time () + 35
185
185
while True :
186
+ info ('Checking consumers...' )
186
187
for c in range (num_consumers ):
187
188
188
189
# Verify all consumers have been created
@@ -212,9 +213,9 @@ def consumer_thread(i, group_id):
212
213
213
214
if not rejoining and is_same_generation :
214
215
break
215
- else :
216
- sleep (1 )
217
216
assert time () < timeout , "timeout waiting for assignments"
217
+ info ('sleeping...' )
218
+ sleep (1 )
218
219
219
220
info ('Group stabilized; verifying assignment' )
220
221
output = kafka_admin_client .describe_consumer_groups (group_id_list )
@@ -236,6 +237,8 @@ def consumer_thread(i, group_id):
236
237
for c in range (num_consumers ):
237
238
info ('Stopping consumer %s' , c )
238
239
stop [c ].set ()
240
+ for c in range (num_consumers ):
241
+ info ('Waiting for consumer thread %s' , c )
239
242
threads [c ].join ()
240
243
threads [c ] = None
241
244
You can’t perform that action at this time.
0 commit comments