@@ -208,9 +208,12 @@ def __init__(self, host, port):
208
208
self .tmp_dir = None
209
209
self .child = None
210
210
211
+ def out (self , message ):
212
+ print ("*** Zookeeper[%s]: %s" % (id (self ), message ))
213
+
211
214
def open (self ):
212
215
self .tmp_dir = tempfile .mkdtemp ()
213
- print ("*** Running local Zookeeper instance..." )
216
+ print ("*** [%s] Running local Zookeeper instance..." % id ( self ) )
214
217
print (" host = %s" % self .host )
215
218
print (" port = %s" % self .port )
216
219
print (" tmp_dir = %s" % self .tmp_dir )
@@ -229,16 +232,16 @@ def open(self):
229
232
self .child .configure_stderr (os .path .join (self .tmp_dir , "stderr.txt" ))
230
233
231
234
# Party!
232
- print ( "*** Starting Zookeeper ..." )
235
+ self . out ( " Starting..." )
233
236
self .child .start ()
234
237
self .child .wait_for (r"Snapshotting" )
235
- print ( "*** Done!" )
238
+ self . out ( " Done!" )
236
239
237
240
def close (self ):
238
- print ( "*** Stopping Zookeeper ..." )
241
+ self . out ( " Stopping..." )
239
242
self .child .stop ()
240
243
self .child = None
241
- print ( "*** Done!" )
244
+ self . out ( " Done!" )
242
245
shutil .rmtree (self .tmp_dir )
243
246
244
247
@@ -274,13 +277,16 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=
274
277
self .child = None
275
278
self .running = False
276
279
280
+ def out (self , message ):
281
+ print ("*** Kafka[%s]: %s" % (id (self ), message ))
282
+
277
283
def open (self ):
278
284
if self .running :
279
- print ( "*** Kafka instance already running" )
285
+ self . out ( "Instance already running" )
280
286
return
281
287
282
288
self .tmp_dir = tempfile .mkdtemp ()
283
- print ( "*** Running local Kafka instance" )
289
+ self . out ( " Running local instance" )
284
290
print (" host = %s" % self .host )
285
291
print (" port = %s" % self .port )
286
292
print (" broker_id = %s" % self .broker_id )
@@ -308,31 +314,31 @@ def open(self):
308
314
self .child .configure_stderr (os .path .join (self .tmp_dir , "stderr.txt" ))
309
315
310
316
# Party!
311
- print ( "*** Creating Zookeeper chroot node..." )
317
+ self . out ( " Creating Zookeeper chroot node..." )
312
318
proc = subprocess .Popen (kafka_run_class_args (
313
319
"org.apache.zookeeper.ZooKeeperMain" ,
314
320
"-server" , "%s:%d" % (self .zk_host , self .zk_port ),
315
321
"create" , "/%s" % self .zk_chroot , "kafka-python"
316
322
))
317
323
if proc .wait () != 0 :
318
- print ( "*** Failed to create Zookeeper chroot node" )
324
+ self . out ( " Failed to create Zookeeper chroot node" )
319
325
raise RuntimeError ("Failed to create Zookeeper chroot node" )
320
- print ( "*** Done!" )
326
+ self . out ( " Done!" )
321
327
322
- print ( "*** Starting Kafka ..." )
328
+ self . out ( " Starting..." )
323
329
self .child .start ()
324
330
self .child .wait_for (r"\[Kafka Server %d\], Started" % self .broker_id )
325
- print ( "*** Done!" )
331
+ self . out ( " Done!" )
326
332
self .running = True
327
333
328
334
def close (self ):
329
335
if not self .running :
330
- print ( "*** Kafka instance already stopped" )
336
+ self . out ( "Instance already stopped" )
331
337
return
332
338
333
- print ( "*** Stopping Kafka ..." )
339
+ self . out ( " Stopping..." )
334
340
self .child .stop ()
335
341
self .child = None
336
- print ( "*** Done!" )
342
+ self . out ( " Done!" )
337
343
shutil .rmtree (self .tmp_dir )
338
344
self .running = False
0 commit comments