|
12 | 12 | from .fixtures import ZookeeperFixture, KafkaFixture
|
13 | 13 |
|
14 | 14 |
|
| 15 | +def ensure_topic_creation(client, topic_name): |
| 16 | + times = 0 |
| 17 | + while True: |
| 18 | + times += 1 |
| 19 | + client.load_metadata_for_topics(topic_name) |
| 20 | + if client.has_metadata_for_topic(topic_name): |
| 21 | + break |
| 22 | + print "Waiting for %s topic to be created" % topic_name |
| 23 | + time.sleep(1) |
| 24 | + |
| 25 | + if times > 30: |
| 26 | + raise Exception("Unable to create topic %s" % topic_name) |
| 27 | + |
| 28 | + |
15 | 29 | class KafkaTestCase(unittest.TestCase):
|
16 | 30 | def setUp(self):
|
17 | 31 | topic_name = self.id()[self.id().rindex(".")+1:]
|
18 |
| - times = 0 |
19 |
| - while True: |
20 |
| - times += 1 |
21 |
| - self.client.load_metadata_for_topics(topic_name) |
22 |
| - if self.client.has_metadata_for_topic(topic_name): |
23 |
| - break |
24 |
| - print "Waiting for %s topic to be created" % topic_name |
25 |
| - time.sleep(1) |
26 |
| - |
27 |
| - if times > 30: |
28 |
| - raise Exception("Unable to create topic %s" % topic_name) |
| 32 | + ensure_topic_creation(self.client, topic_name) |
29 | 33 |
|
30 | 34 |
|
31 | 35 | class TestKafkaClient(KafkaTestCase):
|
@@ -719,7 +723,7 @@ def test_multi_process_consumer(self):
|
719 | 723 | start = datetime.now()
|
720 | 724 | messages = consumer.get_messages(block=True, timeout=5)
|
721 | 725 | diff = (datetime.now() - start).total_seconds()
|
722 |
| - self.assertGreaterEqual(diff, 4.9) |
| 726 | + self.assertGreaterEqual(diff, 4.999) |
723 | 727 | self.assertEqual(len(messages), 0)
|
724 | 728 |
|
725 | 729 | # Send 10 messages
|
@@ -830,7 +834,7 @@ def setUp(self):
|
830 | 834 | kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
|
831 | 835 | self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
|
832 | 836 | self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
|
833 |
| - KafkaTestCase.setUp(self) |
| 837 | + super(TestFailover, self).setUp() |
834 | 838 |
|
835 | 839 | def tearDown(self):
|
836 | 840 | self.client.close()
|
|
0 commit comments