@@ -23,7 +23,7 @@ def test_consumer(kafka_broker, topic):
23
23
# The `topic` fixture is included because
24
24
# 0.8.2 brokers need a topic to function well
25
25
consumer = KafkaConsumer (bootstrap_servers = get_connect_str (kafka_broker ))
26
- consumer .poll (500 )
26
+ consumer .poll (timeout_ms = 500 )
27
27
assert len (consumer ._client ._conns ) > 0
28
28
node_id = list (consumer ._client ._conns .keys ())[0 ]
29
29
assert consumer ._client ._conns [node_id ].state is ConnectionStates .CONNECTED
@@ -34,7 +34,7 @@ def test_consumer(kafka_broker, topic):
34
34
def test_consumer_topics (kafka_broker , topic ):
35
35
consumer = KafkaConsumer (bootstrap_servers = get_connect_str (kafka_broker ))
36
36
# Necessary to drive the IO
37
- consumer .poll (500 )
37
+ consumer .poll (timeout_ms = 500 )
38
38
assert topic in consumer .topics ()
39
39
assert len (consumer .partitions_for_topic (topic )) > 0
40
40
consumer .close ()
@@ -58,7 +58,7 @@ def consumer_thread(i):
58
58
group_id = group_id ,
59
59
heartbeat_interval_ms = 500 )
60
60
while not stop [i ].is_set ():
61
- for tp , records in six .itervalues (consumers [i ].poll (100 )):
61
+ for tp , records in six .itervalues (consumers [i ].poll (timeout_ms = 200 )):
62
62
messages [i ][tp ].extend (records )
63
63
consumers [i ].close ()
64
64
consumers [i ] = None
0 commit comments