Skip to content

Commit 5a09dab

Browse files
authored
Fix external kafka/zk fixtures for testing (#2533)
1 parent 747a1c1 commit 5a09dab

File tree

3 files changed

+62
-78
lines changed

3 files changed

+62
-78
lines changed

test/conftest.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,36 @@
11
from __future__ import absolute_import
22

3+
import os
34
import uuid
45

56
import pytest
67

8+
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
79
from test.testutil import env_kafka_version, random_string
810
from test.fixtures import KafkaFixture, ZookeeperFixture
911

1012
@pytest.fixture(scope="module")
1113
def zookeeper():
1214
"""Return a Zookeeper fixture"""
13-
zk_instance = ZookeeperFixture.instance()
14-
yield zk_instance
15-
zk_instance.close()
15+
if "ZOOKEEPER_URI" in os.environ:
16+
parse = urlparse(os.environ["ZOOKEEPER_URI"])
17+
(host, port) = (parse.hostname, parse.port)
18+
yield ZookeeperFixture.instance(host=host, port=port, external=True)
19+
else:
20+
zk_instance = ZookeeperFixture.instance()
21+
yield zk_instance
22+
zk_instance.close()
1623

1724

1825
@pytest.fixture(scope="module")
19-
def kafka_broker(kafka_broker_factory):
26+
def kafka_broker(kafka_broker_factory, zookeeper):
2027
"""Return a Kafka broker fixture"""
21-
return kafka_broker_factory()[0]
28+
if "KAFKA_URI" in os.environ:
29+
parse = urlparse(os.environ["KAFKA_URI"])
30+
(host, port) = (parse.hostname, parse.port)
31+
return KafkaFixture.instance(0, zookeeper, host=host, port=port, external=True)
32+
else:
33+
return kafka_broker_factory()[0]
2234

2335

2436
@pytest.fixture(scope="module")

test/fixtures.py

Lines changed: 37 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
import uuid
1111

1212
import py
13-
from kafka.vendor.six.moves import urllib, range
14-
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
13+
from kafka.vendor.six.moves import range
1514

1615
from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
1716
from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError
@@ -74,43 +73,6 @@ def __init__(self):
7473
if not os.path.isdir(self.kafka_root):
7574
raise FileNotFoundError(self.kafka_root)
7675

77-
@classmethod
78-
def download_official_distribution(cls,
79-
kafka_version=None,
80-
scala_version=None,
81-
output_dir=None):
82-
if not kafka_version:
83-
kafka_version = cls.kafka_version
84-
if not scala_version:
85-
scala_version = cls.scala_version
86-
if not output_dir:
87-
output_dir = os.path.join(cls.project_root, 'servers', 'dist')
88-
89-
distfile = 'kafka_%s-%s' % (scala_version, kafka_version,)
90-
url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,)
91-
output_file = os.path.join(output_dir, distfile + '.tgz')
92-
93-
if os.path.isfile(output_file):
94-
log.info("Found file already on disk: %s", output_file)
95-
return output_file
96-
97-
# New tarballs are .tgz, older ones are sometimes .tar.gz
98-
try:
99-
url = url_base + distfile + '.tgz'
100-
log.info("Attempting to download %s", url)
101-
response = urllib.request.urlopen(url)
102-
except urllib.error.HTTPError:
103-
log.exception("HTTP Error")
104-
url = url_base + distfile + '.tar.gz'
105-
log.info("Attempting to download %s", url)
106-
response = urllib.request.urlopen(url)
107-
108-
log.info("Saving distribution file to %s", output_file)
109-
with open(output_file, 'w') as output_file_fd:
110-
output_file_fd.write(response.read())
111-
112-
return output_file
113-
11476
@classmethod
11577
def test_resource(cls, filename):
11678
path = os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename)
@@ -169,23 +131,18 @@ def dump_logs(self):
169131

170132
class ZookeeperFixture(Fixture):
171133
@classmethod
172-
def instance(cls):
173-
if "ZOOKEEPER_URI" in os.environ:
174-
parse = urlparse(os.environ["ZOOKEEPER_URI"])
175-
(host, port) = (parse.hostname, parse.port)
176-
fixture = ExternalService(host, port)
177-
else:
178-
(host, port) = ("127.0.0.1", None)
179-
fixture = cls(host, port)
180-
134+
def instance(cls, host=None, port=None, external=False):
135+
if host is None:
136+
host = "127.0.0.1"
137+
fixture = cls(host, port, external=external)
181138
fixture.open()
182139
return fixture
183140

184-
def __init__(self, host, port, tmp_dir=None):
141+
def __init__(self, host, port, external=False, tmp_dir=None):
185142
super(ZookeeperFixture, self).__init__()
186143
self.host = host
187144
self.port = port
188-
145+
self.running = external
189146
self.tmp_dir = tmp_dir
190147

191148
def kafka_run_class_env(self):
@@ -198,6 +155,8 @@ def out(self, message):
198155
log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message)
199156

200157
def open(self):
158+
if self.running:
159+
return
201160
if self.tmp_dir is None:
202161
self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
203162
self.tmp_dir.ensure(dir=True)
@@ -262,34 +221,30 @@ class KafkaFixture(Fixture):
262221

263222
@classmethod
264223
def instance(cls, broker_id, zookeeper, zk_chroot=None,
265-
host=None, port=None,
266-
transport='PLAINTEXT', replicas=1, partitions=2,
224+
host=None, port=None, external=False,
225+
transport='PLAINTEXT', replicas=1, partitions=4,
267226
sasl_mechanism=None, auto_create_topic=True, tmp_dir=None):
268227

269228
if zk_chroot is None:
270229
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
271-
if "KAFKA_URI" in os.environ:
272-
parse = urlparse(os.environ["KAFKA_URI"])
273-
(host, port) = (parse.hostname, parse.port)
274-
fixture = ExternalService(host, port)
275-
else:
276-
if host is None:
277-
host = "localhost"
278-
fixture = KafkaFixture(host, port, broker_id,
279-
zookeeper, zk_chroot,
280-
transport=transport,
281-
replicas=replicas, partitions=partitions,
282-
sasl_mechanism=sasl_mechanism,
283-
auto_create_topic=auto_create_topic,
284-
tmp_dir=tmp_dir)
285-
286-
fixture.open()
230+
if host is None:
231+
host = "localhost"
232+
fixture = KafkaFixture(host, port, broker_id,
233+
zookeeper, zk_chroot,
234+
external=external,
235+
transport=transport,
236+
replicas=replicas, partitions=partitions,
237+
sasl_mechanism=sasl_mechanism,
238+
auto_create_topic=auto_create_topic,
239+
tmp_dir=tmp_dir)
240+
241+
fixture.open()
287242
return fixture
288243

289244
def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
290245
replicas=1, partitions=2, transport='PLAINTEXT',
291246
sasl_mechanism=None, auto_create_topic=True,
292-
tmp_dir=None):
247+
tmp_dir=None, external=False):
293248
super(KafkaFixture, self).__init__()
294249

295250
self.host = host
@@ -321,9 +276,16 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
321276
self.partitions = partitions
322277

323278
self.tmp_dir = tmp_dir
324-
self.running = False
279+
self.external = external
280+
281+
if self.external:
282+
self.child = ExternalService(self.host, self.port)
283+
(self._client,) = self.get_clients(1, client_id='_internal_client')
284+
self.running = True
285+
else:
286+
self._client = None
287+
self.running = False
325288

326-
self._client = None
327289
self.sasl_config = ''
328290
self.jaas_config = ''
329291

@@ -416,6 +378,8 @@ def _create_zk_chroot(self):
416378
self.out("Kafka chroot created in Zookeeper!")
417379

418380
def start(self):
381+
if self.running:
382+
return True
419383
# Configure Kafka child process
420384
properties = self.tmp_dir.join("kafka.properties")
421385
jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf")
@@ -515,6 +479,8 @@ def __del__(self):
515479
self.close()
516480

517481
def stop(self):
482+
if self.external:
483+
return
518484
if not self.running:
519485
self.out("Instance already stopped")
520486
return

test/service.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ def open(self):
2929
def close(self):
3030
pass
3131

32+
def dump_logs(self):
33+
pass
34+
35+
def wait_for(self, pattern, timeout=30):
36+
pass
3237

3338
class SpawnedService(threading.Thread):
3439
def __init__(self, args=None, env=None):
@@ -52,8 +57,8 @@ def __init__(self, args=None, env=None):
5257
log.debug(" {key}={value}".format(key=key, value=value))
5358

5459
def _spawn(self):
55-
if self.alive: return
56-
if self.child and self.child.poll() is None: return
60+
if self.alive or (self.child and self.child.poll() is None):
61+
return
5762

5863
self.child = subprocess.Popen(
5964
self.args,
@@ -76,6 +81,7 @@ def _despawn(self):
7681
else:
7782
self.child.kill()
7883

84+
# via threading.Thread
7985
def run(self):
8086
self._spawn()
8187
while True:

0 commit comments

Comments
 (0)