Skip to content

Commit 8e4155d

Browse files
Thomas Heslinmanub
Thomas Heslin
authored andcommitted
Concurrent kafka instances (#105)
* adding concurrent test * Kafka and zookeeper instances are now managed in a list, represented in case class instances. The start methods, which used to return Unit, now return references to the base type: EmbeddedServer. These references can be used to stop the specific servers using a new overload stop function. * Added javadoc to EmbeddedServer.scala * Removing assertion of first kafka being available, focusing on second kafka functionality after first has shutdown * Added extension method for filtering servers down to EmbeddedK or EmbeddedZ.
1 parent 3c2fdbc commit 8e4155d

File tree

3 files changed

+207
-33
lines changed

3 files changed

+207
-33
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1515
import org.scalatest.Suite
1616

1717
import scala.collection.JavaConverters._
18-
import scala.collection.mutable
1918
import scala.collection.mutable.ListBuffer
2019
import scala.concurrent.duration._
2120
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, TimeoutException}
@@ -29,81 +28,107 @@ trait EmbeddedKafka extends EmbeddedKafkaSupport {
2928

3029
object EmbeddedKafka extends EmbeddedKafkaSupport {
3130

32-
private[this] var factory: Option[ServerCnxnFactory] = None
33-
private[this] var broker: Option[KafkaServer] = None
34-
private[this] val logsDirs = mutable.Buffer.empty[Directory]
31+
private[this] var servers: Seq[EmbeddedServer] = Seq.empty
3532

3633
/**
3734
* Starts a ZooKeeper instance and a Kafka broker in memory, using temporary directories for storing logs.
3835
* The log directories will be cleaned after calling the [[stop()]] method or on JVM exit, whichever happens earlier.
3936
*
4037
* @param config an implicit [[EmbeddedKafkaConfig]]
4138
*/
42-
def start()(implicit config: EmbeddedKafkaConfig): Unit = {
39+
def start()(implicit config: EmbeddedKafkaConfig): EmbeddedK = {
4340
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
4441
val kafkaLogsDir = Directory.makeTemp("kafka-logs")
4542

46-
factory = Option(startZooKeeper(config.zooKeeperPort, zkLogsDir))
47-
broker = Option(startKafka(config, kafkaLogsDir))
43+
val factory = EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
44+
val broker = EmbeddedK(Option(factory), startKafka(config, kafkaLogsDir), kafkaLogsDir)
4845

49-
logsDirs ++= Seq(zkLogsDir, kafkaLogsDir)
46+
servers :+= broker
47+
broker
5048
}
5149

5250
/**
5351
* Starts a Zookeeper instance in memory, storing logs in a specific location.
5452
*
5553
* @param zkLogsDir the path for the Zookeeper logs
5654
* @param config an implicit [[EmbeddedKafkaConfig]]
55+
* @return an [[EmbeddedZ]] server
5756
*/
5857
def startZooKeeper(zkLogsDir: Directory)(
59-
implicit config: EmbeddedKafkaConfig): Unit = {
60-
factory = Option(startZooKeeper(config.zooKeeperPort, zkLogsDir))
58+
implicit config: EmbeddedKafkaConfig): EmbeddedZ = {
59+
val factory = EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
60+
servers :+= factory
61+
factory
6162
}
6263

6364
/**
6465
* Starts a Kafka broker in memory, storing logs in a specific location.
6566
*
66-
* @param kafkaLogDir the path for the Kafka logs
67-
* @param config an implicit [[EmbeddedKafkaConfig]]
67+
* @param kafkaLogsDir the path for the Kafka logs
68+
* @param config an implicit [[EmbeddedKafkaConfig]]
69+
* @return an [[EmbeddedK]] server
6870
*/
69-
def startKafka(kafkaLogDir: Directory)(
70-
implicit config: EmbeddedKafkaConfig): Unit = {
71-
broker = Option(startKafka(config, kafkaLogDir))
71+
def startKafka(kafkaLogsDir: Directory)(
72+
implicit config: EmbeddedKafkaConfig): EmbeddedK = {
73+
val broker = EmbeddedK(startKafka(config, kafkaLogsDir), kafkaLogsDir)
74+
servers :+= broker
75+
broker
7276
}
7377

7478
/**
75-
* Stops the in memory ZooKeeper instance and Kafka broker, and deletes the log directories.
79+
* Stops all in memory ZooKeeper instances and Kafka brokers, and deletes the log directories.
7680
*/
7781
def stop(): Unit = {
78-
stopKafka()
79-
stopZooKeeper()
80-
logsDirs.foreach(_.deleteRecursively())
81-
logsDirs.clear()
82+
servers.foreach(_.stop(true))
83+
servers = Seq.empty
8284
}
8385

8486
/**
85-
* Stops the in memory Zookeeper instance, preserving the logs directory.
87+
* Stops a specific [[EmbeddedServer]] instance, and deletes the log directory.
88+
*
89+
* @param server the [[EmbeddedServer]] to be stopped.
90+
*/
91+
def stop(server: EmbeddedServer): Unit = {
92+
server.stop(true)
93+
servers = servers.filter(x => x != server)
94+
}
95+
96+
/**
97+
* Stops all in memory Zookeeper instances, preserving the logs directories.
8698
*/
8799
def stopZooKeeper(): Unit = {
88-
factory.foreach(_.shutdown())
89-
factory = None
100+
val factories = servers.toFilteredSeq[EmbeddedZ](isEmbeddedZ)
101+
102+
factories
103+
.foreach(_.stop(false))
104+
105+
servers = servers.filter(!factories.contains(_))
90106
}
91107

92108
/**
93-
* Stops the in memory Kafka instance, preserving the logs directory.
109+
* Stops all in memory Kafka instances, preserving the logs directories.
94110
*/
95111
def stopKafka(): Unit = {
96-
broker.foreach { b =>
97-
b.shutdown()
98-
b.awaitShutdown()
99-
}
100-
broker = None
112+
val brokers = servers.toFilteredSeq[EmbeddedK](isEmbeddedK)
113+
114+
brokers
115+
.foreach(_.stop(false))
116+
117+
servers = servers.filter(!brokers.contains(_))
101118
}
102119

103120
/**
104-
* Returns whether the in memory Kafka and Zookeeper are running.
121+
* Returns whether the in memory Kafka and Zookeeper are both running.
105122
*/
106-
def isRunning: Boolean = factory.nonEmpty && broker.nonEmpty
123+
def isRunning: Boolean = servers.toFilteredSeq[EmbeddedK](isEmbeddedK).exists(_.factory.isDefined)
124+
125+
private def isEmbeddedK(server: EmbeddedServer): Boolean = server.isInstanceOf[EmbeddedK]
126+
private def isEmbeddedZ(server: EmbeddedServer): Boolean = server.isInstanceOf[EmbeddedZ]
127+
128+
implicit class ServerOps(servers: Seq[EmbeddedServer]) {
129+
def toFilteredSeq[T <: EmbeddedServer](filter: EmbeddedServer => Boolean): Seq[T] =
130+
servers.filter(filter).asInstanceOf[Seq[T]]
131+
}
107132
}
108133

109134
sealed trait EmbeddedKafkaSupport {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package net.manub.embeddedkafka
2+
3+
import kafka.server.KafkaServer
4+
import org.apache.zookeeper.server.ServerCnxnFactory
5+
6+
import scala.reflect.io.Directory
7+
8+
/**
9+
* Represents a running server with a method of stopping the instance.
10+
*/
11+
sealed trait EmbeddedServer {
12+
13+
def stop(clearLogs: Boolean): Unit
14+
}
15+
16+
/**
17+
* An instance of an embedded Zookeeper server.
18+
*
19+
* @param factory the server.
20+
* @param logsDirs the [[Directory]] logs are to be written to.
21+
* @param config the [[EmbeddedKafkaConfig]] used to start the factory.
22+
*/
23+
case class EmbeddedZ(factory: ServerCnxnFactory,
24+
logsDirs: Directory)(
25+
implicit config: EmbeddedKafkaConfig) extends EmbeddedServer {
26+
27+
/**
28+
* Shuts down the factory and then optionally deletes the log directory.
29+
*
30+
* @param clearLogs pass `true` to recursively delete the log directory.
31+
*/
32+
override def stop(clearLogs: Boolean) = {
33+
factory.shutdown()
34+
if (clearLogs) logsDirs.deleteRecursively()
35+
}
36+
}
37+
38+
/**
39+
* An instance of an embedded Kafka serer.
40+
*
41+
* @param factory the optional [[EmbeddedZ]] server which Kafka relies upon.
42+
* @param broker the Kafka server.
43+
* @param logsDirs the [[Directory]] logs are to be written to.
44+
* @param config the [[EmbeddedKafkaConfig]] used to start the broker.
45+
*/
46+
case class EmbeddedK(factory: Option[EmbeddedZ],
47+
broker: KafkaServer,
48+
logsDirs: Directory)(
49+
implicit config: EmbeddedKafkaConfig) extends EmbeddedServer {
50+
51+
/**
52+
* Shuts down the broker and the factory it relies upon, if defined.
53+
* Optionally deletes the log directory.
54+
*
55+
* @param clearLogs pass `true` to recursively delete the log directory.
56+
*/
57+
override def stop(clearLogs: Boolean) = {
58+
broker.shutdown()
59+
broker.awaitShutdown()
60+
61+
factory.foreach(_.stop(clearLogs))
62+
63+
if (clearLogs) logsDirs.deleteRecursively()
64+
}
65+
}
66+
67+
object EmbeddedK {
68+
def apply(broker: KafkaServer, logsDirs: Directory)(
69+
implicit config: EmbeddedKafkaConfig): EmbeddedK =
70+
EmbeddedK(None, broker, logsDirs)
71+
}

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaObjectSpec.scala

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
package net.manub.embeddedkafka
22

3+
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
4+
import net.manub.embeddedkafka.EmbeddedKafka._
5+
6+
import scala.collection.JavaConverters._
7+
import scala.reflect.io.Directory
8+
39
class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
410

11+
val consumerPollTimeout = 5000
12+
513
"the EmbeddedKafka object" when {
614
"invoking the start and stop methods" should {
715
"start and stop Kafka and Zookeeper on the default ports" in {
@@ -26,15 +34,85 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
2634

2735
EmbeddedKafka.stop()
2836
}
37+
38+
"start and stop a specific Kafka" in {
39+
val firstBroker = EmbeddedKafka.start()(EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001))
40+
EmbeddedKafka.start()(EmbeddedKafkaConfig(kafkaPort = 8000, zooKeeperPort = 8001))
41+
42+
kafkaIsAvailable(7000)
43+
zookeeperIsAvailable(7001)
44+
45+
kafkaIsAvailable(8000)
46+
zookeeperIsAvailable(8001)
47+
48+
EmbeddedKafka.stop(firstBroker)
49+
50+
kafkaIsNotAvailable(7000)
51+
zookeeperIsNotAvailable(7001)
52+
53+
kafkaIsAvailable(8000)
54+
zookeeperIsAvailable(8001)
55+
56+
EmbeddedKafka.stop()
57+
}
58+
59+
"start and stop multiple Kafka instances on specified ports" in {
60+
val someConfig = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 32111)
61+
val someBroker = EmbeddedKafka.start()(someConfig)
62+
63+
val someOtherConfig = EmbeddedKafkaConfig(kafkaPort = 23456, zooKeeperPort = 43211)
64+
val someOtherBroker = EmbeddedKafka.start()(someOtherConfig)
65+
66+
val topic = "publish_test_topic_1"
67+
val someOtherMessage = "another message!"
68+
69+
val serializer = new StringSerializer
70+
val deserializer = new StringDeserializer
71+
72+
publishToKafka(topic, "hello world!")(someConfig, serializer)
73+
publishToKafka(topic, someOtherMessage)(someOtherConfig, serializer)
74+
75+
kafkaIsAvailable(someConfig.kafkaPort)
76+
EmbeddedKafka.stop(someBroker)
77+
78+
val anotherConsumer = kafkaConsumer(someOtherConfig, deserializer, deserializer)
79+
anotherConsumer.subscribe(List(topic).asJava)
80+
81+
val moreRecords = anotherConsumer.poll(consumerPollTimeout)
82+
moreRecords.count shouldBe 1
83+
84+
val someOtherRecord = moreRecords.iterator().next
85+
someOtherRecord.value shouldBe someOtherMessage
86+
87+
EmbeddedKafka.stop(someOtherBroker)
88+
}
2989
}
3090

31-
"invoking the isRunnning method" should {
32-
"return whether both Kafka and Zookeeper are running" in {
91+
"invoking the isRunning method" should {
92+
"return true when both Kafka and Zookeeper are running" in {
3393
EmbeddedKafka.start()
3494
EmbeddedKafka.isRunning shouldBe true
3595
EmbeddedKafka.stop()
3696
EmbeddedKafka.isRunning shouldBe false
3797
}
98+
99+
"return false when only Kafka is running" in {
100+
val unmanagedZookeeper = EmbeddedKafka.startZooKeeper(6000, Directory.makeTemp("zookeeper-test-logs"))
101+
102+
EmbeddedKafka.startKafka(Directory.makeTemp("kafka-test-logs"))
103+
EmbeddedKafka.isRunning shouldBe false
104+
EmbeddedKafka.stop()
105+
EmbeddedKafka.isRunning shouldBe false
106+
107+
unmanagedZookeeper.shutdown()
108+
}
109+
110+
"return false when only Zookeeper is running" in {
111+
EmbeddedKafka.startZooKeeper(Directory.makeTemp("zookeeper-test-logs"))
112+
EmbeddedKafka.isRunning shouldBe false
113+
EmbeddedKafka.stop()
114+
EmbeddedKafka.isRunning shouldBe false
115+
}
38116
}
39117
}
40118
}

0 commit comments

Comments
 (0)