Skip to content

Commit d4272de

Browse files
author
Thomas Heslin
committed
Kafka and zookeeper instances are now managed in a list, represented in case class instances. The start methods, which used to return Unit, now return references to the base type: EmbeddedServer. These references can be used to stop the specific servers using a new overload stop function.
1 parent 71fba04 commit d4272de

File tree

3 files changed

+147
-37
lines changed

3 files changed

+147
-37
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1515
import org.scalatest.Suite
1616

1717
import scala.collection.JavaConverters._
18-
import scala.collection.mutable
1918
import scala.collection.mutable.ListBuffer
2019
import scala.concurrent.duration._
2120
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, TimeoutException}
@@ -29,81 +28,107 @@ trait EmbeddedKafka extends EmbeddedKafkaSupport {
2928

3029
object EmbeddedKafka extends EmbeddedKafkaSupport {
3130

32-
private[this] var factory: Option[ServerCnxnFactory] = None
33-
private[this] var broker: Option[KafkaServer] = None
34-
private[this] val logsDirs = mutable.Buffer.empty[Directory]
31+
private[this] var servers: Seq[EmbeddedServer] = Seq.empty
3532

3633
/**
3734
* Starts a ZooKeeper instance and a Kafka broker in memory, using temporary directories for storing logs.
3835
* The log directories will be cleaned after calling the [[stop()]] method or on JVM exit, whichever happens earlier.
3936
*
4037
* @param config an implicit [[EmbeddedKafkaConfig]]
4138
*/
42-
def start()(implicit config: EmbeddedKafkaConfig): Unit = {
39+
def start()(implicit config: EmbeddedKafkaConfig): EmbeddedK = {
4340
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
4441
val kafkaLogsDir = Directory.makeTemp("kafka-logs")
4542

46-
factory = Option(startZooKeeper(config.zooKeeperPort, zkLogsDir))
47-
broker = Option(startKafka(config, kafkaLogsDir))
43+
val factory = EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
44+
val broker = EmbeddedK(Option(factory), startKafka(config, kafkaLogsDir), kafkaLogsDir)
4845

49-
logsDirs ++= Seq(zkLogsDir, kafkaLogsDir)
46+
servers :+= broker
47+
broker
5048
}
5149

5250
/**
5351
* Starts a Zookeeper instance in memory, storing logs in a specific location.
5452
*
5553
* @param zkLogsDir the path for the Zookeeper logs
5654
* @param config an implicit [[EmbeddedKafkaConfig]]
55+
* @return an [[EmbeddedZ]] server
5756
*/
5857
def startZooKeeper(zkLogsDir: Directory)(
59-
implicit config: EmbeddedKafkaConfig): Unit = {
60-
factory = Option(startZooKeeper(config.zooKeeperPort, zkLogsDir))
58+
implicit config: EmbeddedKafkaConfig): EmbeddedZ = {
59+
val factory = EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
60+
servers :+= factory
61+
factory
6162
}
6263

6364
/**
6465
* Starts a Kafka broker in memory, storing logs in a specific location.
6566
*
66-
* @param kafkaLogDir the path for the Kafka logs
67-
* @param config an implicit [[EmbeddedKafkaConfig]]
67+
* @param kafkaLogsDir the path for the Kafka logs
68+
* @param config an implicit [[EmbeddedKafkaConfig]]
69+
* @return an [[EmbeddedK]] server
6870
*/
69-
def startKafka(kafkaLogDir: Directory)(
70-
implicit config: EmbeddedKafkaConfig): Unit = {
71-
broker = Option(startKafka(config, kafkaLogDir))
71+
def startKafka(kafkaLogsDir: Directory)(
72+
implicit config: EmbeddedKafkaConfig): EmbeddedK = {
73+
val broker = EmbeddedK(startKafka(config, kafkaLogsDir), kafkaLogsDir)
74+
servers :+= broker
75+
broker
7276
}
7377

7478
/**
75-
* Stops the in memory ZooKeeper instance and Kafka broker, and deletes the log directories.
79+
* Stops all in memory ZooKeeper instances and Kafka brokers, and deletes the log directories.
7680
*/
7781
def stop(): Unit = {
78-
stopKafka()
79-
stopZooKeeper()
80-
logsDirs.foreach(_.deleteRecursively())
81-
logsDirs.clear()
82+
servers.foreach(_.stop(true))
83+
servers = Seq.empty
84+
}
85+
86+
/**
87+
* Stops a specific [[EmbeddedServer]] instance, and deletes the log directory.
88+
*
89+
* @param server the [[EmbeddedServer]] to be stopped.
90+
*/
91+
def stop(server: EmbeddedServer): Unit = {
92+
server.stop(true)
93+
servers = servers.filter(x => x != server)
8294
}
8395

8496
/**
85-
* Stops the in memory Zookeeper instance, preserving the logs directory.
97+
* Stops all in memory Zookeeper instances, preserving the logs directories.
8698
*/
8799
def stopZooKeeper(): Unit = {
88-
factory.foreach(_.shutdown())
89-
factory = None
100+
val factories = servers
101+
.filter(_.isInstanceOf[EmbeddedZ])
102+
.asInstanceOf[Seq[EmbeddedZ]]
103+
104+
factories
105+
.foreach(_.stop(false))
106+
107+
servers = servers.filter(!factories.contains(_))
90108
}
91109

92110
/**
93-
* Stops the in memory Kafka instance, preserving the logs directory.
111+
* Stops all in memory Kafka instances, preserving the logs directories.
94112
*/
95113
def stopKafka(): Unit = {
96-
broker.foreach { b =>
97-
b.shutdown()
98-
b.awaitShutdown()
99-
}
100-
broker = None
114+
val brokers = servers
115+
.filter(_.isInstanceOf[EmbeddedK])
116+
.asInstanceOf[Seq[EmbeddedK]]
117+
118+
brokers
119+
.foreach(_.stop(false))
120+
121+
servers = servers.filter(!brokers.contains(_))
101122
}
102123

103124
/**
104125
* Returns whether the in memory Kafka and Zookeeper are running.
105126
*/
106-
def isRunning: Boolean = factory.nonEmpty && broker.nonEmpty
127+
def isRunning: Boolean =
128+
servers
129+
.filter(_.isInstanceOf[EmbeddedK])
130+
.asInstanceOf[Seq[EmbeddedK]]
131+
.exists(_.factory.isDefined)
107132
}
108133

109134
sealed trait EmbeddedKafkaSupport {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package net.manub.embeddedkafka
2+
3+
import kafka.server.KafkaServer
4+
import org.apache.zookeeper.server.ServerCnxnFactory
5+
6+
import scala.reflect.io.Directory
7+
8+
sealed trait EmbeddedServer {
9+
10+
def stop(clearLogs: Boolean): Unit
11+
}
12+
13+
case class EmbeddedZ(factory: ServerCnxnFactory,
14+
logsDirs: Directory)(
15+
implicit config: EmbeddedKafkaConfig) extends EmbeddedServer {
16+
17+
override def stop(clearLogs: Boolean) = {
18+
factory.shutdown()
19+
if (clearLogs) logsDirs.deleteRecursively()
20+
}
21+
}
22+
23+
case class EmbeddedK(factory: Option[EmbeddedZ],
24+
broker: KafkaServer,
25+
logsDirs: Directory)(
26+
implicit config: EmbeddedKafkaConfig) extends EmbeddedServer {
27+
28+
override def stop(clearLogs: Boolean) = {
29+
broker.shutdown()
30+
broker.awaitShutdown()
31+
32+
factory.foreach(_.stop(clearLogs))
33+
34+
if (clearLogs) logsDirs.deleteRecursively()
35+
}
36+
}
37+
38+
object EmbeddedK {
39+
def apply(broker: KafkaServer, logsDirs: Directory)(
40+
implicit config: EmbeddedKafkaConfig): EmbeddedK =
41+
EmbeddedK(None, broker, logsDirs)
42+
}

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaObjectSpec.scala

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
44
import net.manub.embeddedkafka.EmbeddedKafka._
55

66
import scala.collection.JavaConverters._
7+
import scala.reflect.io.Directory
78

89
class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
910

@@ -34,12 +35,33 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
3435
EmbeddedKafka.stop()
3536
}
3637

37-
"multiple EmbeddedKafka can run in parallel" in {
38+
"start and stop a specific Kafka" in {
39+
val firstBroker = EmbeddedKafka.start()(EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001))
40+
EmbeddedKafka.start()(EmbeddedKafkaConfig(kafkaPort = 8000, zooKeeperPort = 8001))
41+
42+
kafkaIsAvailable(7000)
43+
zookeeperIsAvailable(7001)
44+
45+
kafkaIsAvailable(8000)
46+
zookeeperIsAvailable(8001)
47+
48+
EmbeddedKafka.stop(firstBroker)
49+
50+
kafkaIsNotAvailable(7000)
51+
zookeeperIsNotAvailable(7001)
52+
53+
kafkaIsAvailable(8000)
54+
zookeeperIsAvailable(8001)
55+
56+
EmbeddedKafka.stop()
57+
}
58+
59+
"start and stop multiple Kafka instances on specified ports" in {
3860
val someConfig = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 32111)
39-
EmbeddedKafka.start()(someConfig)
61+
val someBroker = EmbeddedKafka.start()(someConfig)
4062

4163
val someOtherConfig = EmbeddedKafkaConfig(kafkaPort = 23456, zooKeeperPort = 43211)
42-
EmbeddedKafka.start()(someOtherConfig)
64+
val someOtherBroker = EmbeddedKafka.start()(someOtherConfig)
4365

4466
val topic = "publish_test_topic_1"
4567
val someMessage = "hello world!"
@@ -51,6 +73,8 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
5173
publishToKafka(topic, someMessage)(someConfig, serializer)
5274
publishToKafka(topic, someOtherMessage)(someOtherConfig, serializer)
5375

76+
// first
77+
5478
val consumer = kafkaConsumer(someConfig, deserializer, deserializer)
5579
consumer.subscribe(List(topic).asJava)
5680

@@ -60,6 +84,8 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
6084
val record = records.iterator().next
6185
record.value shouldBe someMessage
6286

87+
EmbeddedKafka.stop(someBroker)
88+
6389
// second
6490

6591
val anotherConsumer = kafkaConsumer(someOtherConfig, deserializer, deserializer)
@@ -71,18 +97,35 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
7197
val someOtherRecord = moreRecords.iterator().next
7298
someOtherRecord.value shouldBe someOtherMessage
7399

74-
EmbeddedKafka.stop()
75-
EmbeddedKafka.stop()
100+
EmbeddedKafka.stop(someOtherBroker)
76101
}
77102
}
78103

79-
"invoking the isRunnning method" should {
80-
"return whether both Kafka and Zookeeper are running" in {
104+
"invoking the isRunning method" should {
105+
"return true when both Kafka and Zookeeper are running" in {
81106
EmbeddedKafka.start()
82107
EmbeddedKafka.isRunning shouldBe true
83108
EmbeddedKafka.stop()
84109
EmbeddedKafka.isRunning shouldBe false
85110
}
111+
112+
"return false when only Kafka is running" in {
113+
val unmanagedZookeeper = EmbeddedKafka.startZooKeeper(6000, Directory.makeTemp("zookeeper-test-logs"))
114+
115+
EmbeddedKafka.startKafka(Directory.makeTemp("kafka-test-logs"))
116+
EmbeddedKafka.isRunning shouldBe false
117+
EmbeddedKafka.stop()
118+
EmbeddedKafka.isRunning shouldBe false
119+
120+
unmanagedZookeeper.shutdown()
121+
}
122+
123+
"return false when only Zookeeper is running" in {
124+
EmbeddedKafka.startZooKeeper(Directory.makeTemp("zookeeper-test-logs"))
125+
EmbeddedKafka.isRunning shouldBe false
126+
EmbeddedKafka.stop()
127+
EmbeddedKafka.isRunning shouldBe false
128+
}
86129
}
87130
}
88131
}

0 commit comments

Comments
 (0)