Skip to content

Concurrent kafka instances #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
import org.scalatest.Suite

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, TimeoutException}
Expand All @@ -29,81 +28,107 @@ trait EmbeddedKafka extends EmbeddedKafkaSupport {

object EmbeddedKafka extends EmbeddedKafkaSupport {

private[this] var factory: Option[ServerCnxnFactory] = None
private[this] var broker: Option[KafkaServer] = None
private[this] val logsDirs = mutable.Buffer.empty[Directory]
private[this] var servers: Seq[EmbeddedServer] = Seq.empty

/**
* Starts a ZooKeeper instance and a Kafka broker in memory, using temporary directories for storing logs.
* The log directories will be cleaned after calling the [[stop()]] method or on JVM exit, whichever happens earlier.
*
* @param config an implicit [[EmbeddedKafkaConfig]]
*/
def start()(implicit config: EmbeddedKafkaConfig): Unit = {
def start()(implicit config: EmbeddedKafkaConfig): EmbeddedK = {
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
val kafkaLogsDir = Directory.makeTemp("kafka-logs")

factory = Option(startZooKeeper(config.zooKeeperPort, zkLogsDir))
broker = Option(startKafka(config, kafkaLogsDir))
val factory = EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
val broker = EmbeddedK(Option(factory), startKafka(config, kafkaLogsDir), kafkaLogsDir)

logsDirs ++= Seq(zkLogsDir, kafkaLogsDir)
servers :+= broker
broker
}

/**
* Starts a Zookeeper instance in memory, storing logs in a specific location.
*
* @param zkLogsDir the path for the Zookeeper logs
* @param config an implicit [[EmbeddedKafkaConfig]]
* @return an [[EmbeddedZ]] server
*/
def startZooKeeper(zkLogsDir: Directory)(
implicit config: EmbeddedKafkaConfig): Unit = {
factory = Option(startZooKeeper(config.zooKeeperPort, zkLogsDir))
implicit config: EmbeddedKafkaConfig): EmbeddedZ = {
val factory = EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
servers :+= factory
factory
}

/**
* Starts a Kafka broker in memory, storing logs in a specific location.
*
* @param kafkaLogDir the path for the Kafka logs
* @param config an implicit [[EmbeddedKafkaConfig]]
* @param kafkaLogsDir the path for the Kafka logs
* @param config an implicit [[EmbeddedKafkaConfig]]
* @return an [[EmbeddedK]] server
*/
def startKafka(kafkaLogDir: Directory)(
implicit config: EmbeddedKafkaConfig): Unit = {
broker = Option(startKafka(config, kafkaLogDir))
def startKafka(kafkaLogsDir: Directory)(
implicit config: EmbeddedKafkaConfig): EmbeddedK = {
val broker = EmbeddedK(startKafka(config, kafkaLogsDir), kafkaLogsDir)
servers :+= broker
broker
}

/**
* Stops the in memory ZooKeeper instance and Kafka broker, and deletes the log directories.
* Stops all in memory ZooKeeper instances and Kafka brokers, and deletes the log directories.
*/
def stop(): Unit = {
stopKafka()
stopZooKeeper()
logsDirs.foreach(_.deleteRecursively())
logsDirs.clear()
servers.foreach(_.stop(true))
servers = Seq.empty
}

/**
* Stops the in memory Zookeeper instance, preserving the logs directory.
* Stops a specific [[EmbeddedServer]] instance, and deletes the log directory.
*
* @param server the [[EmbeddedServer]] to be stopped.
*/
def stop(server: EmbeddedServer): Unit = {
server.stop(true)
servers = servers.filter(x => x != server)
}

/**
* Stops all in memory Zookeeper instances, preserving the logs directories.
*/
def stopZooKeeper(): Unit = {
factory.foreach(_.shutdown())
factory = None
val factories = servers.toFilteredSeq[EmbeddedZ](isEmbeddedZ)

factories
.foreach(_.stop(false))

servers = servers.filter(!factories.contains(_))
}

/**
* Stops the in memory Kafka instance, preserving the logs directory.
* Stops all in memory Kafka instances, preserving the logs directories.
*/
def stopKafka(): Unit = {
broker.foreach { b =>
b.shutdown()
b.awaitShutdown()
}
broker = None
val brokers = servers.toFilteredSeq[EmbeddedK](isEmbeddedK)

brokers
.foreach(_.stop(false))

servers = servers.filter(!brokers.contains(_))
}

/**
* Returns whether the in memory Kafka and Zookeeper are running.
* Returns whether the in memory Kafka and Zookeeper are both running.
*/
def isRunning: Boolean = factory.nonEmpty && broker.nonEmpty
def isRunning: Boolean = servers.toFilteredSeq[EmbeddedK](isEmbeddedK).exists(_.factory.isDefined)

private def isEmbeddedK(server: EmbeddedServer): Boolean = server.isInstanceOf[EmbeddedK]
private def isEmbeddedZ(server: EmbeddedServer): Boolean = server.isInstanceOf[EmbeddedZ]

implicit class ServerOps(servers: Seq[EmbeddedServer]) {
def toFilteredSeq[T <: EmbeddedServer](filter: EmbeddedServer => Boolean): Seq[T] =
servers.filter(filter).asInstanceOf[Seq[T]]
}
}

sealed trait EmbeddedKafkaSupport {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package net.manub.embeddedkafka

import kafka.server.KafkaServer
import org.apache.zookeeper.server.ServerCnxnFactory

import scala.reflect.io.Directory

/**
* Represents a running server with a method of stopping the instance.
*/
sealed trait EmbeddedServer {

def stop(clearLogs: Boolean): Unit
}

/**
* An instance of an embedded Zookeeper server.
*
* @param factory the server.
* @param logsDirs the [[Directory]] logs are to be written to.
* @param config the [[EmbeddedKafkaConfig]] used to start the factory.
*/
case class EmbeddedZ(factory: ServerCnxnFactory,
logsDirs: Directory)(
implicit config: EmbeddedKafkaConfig) extends EmbeddedServer {

/**
* Shuts down the factory and then optionally deletes the log directory.
*
* @param clearLogs pass `true` to recursively delete the log directory.
*/
override def stop(clearLogs: Boolean) = {
factory.shutdown()
if (clearLogs) logsDirs.deleteRecursively()
}
}

/**
* An instance of an embedded Kafka serer.
*
* @param factory the optional [[EmbeddedZ]] server which Kafka relies upon.
* @param broker the Kafka server.
* @param logsDirs the [[Directory]] logs are to be written to.
* @param config the [[EmbeddedKafkaConfig]] used to start the broker.
*/
case class EmbeddedK(factory: Option[EmbeddedZ],
broker: KafkaServer,
logsDirs: Directory)(
implicit config: EmbeddedKafkaConfig) extends EmbeddedServer {

/**
* Shuts down the broker and the factory it relies upon, if defined.
* Optionally deletes the log directory.
*
* @param clearLogs pass `true` to recursively delete the log directory.
*/
override def stop(clearLogs: Boolean) = {
broker.shutdown()
broker.awaitShutdown()

factory.foreach(_.stop(clearLogs))

if (clearLogs) logsDirs.deleteRecursively()
}
}

object EmbeddedK {
def apply(broker: KafkaServer, logsDirs: Directory)(
implicit config: EmbeddedKafkaConfig): EmbeddedK =
EmbeddedK(None, broker, logsDirs)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
package net.manub.embeddedkafka

import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import net.manub.embeddedkafka.EmbeddedKafka._

import scala.collection.JavaConverters._
import scala.reflect.io.Directory

class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {

val consumerPollTimeout = 5000

"the EmbeddedKafka object" when {
"invoking the start and stop methods" should {
"start and stop Kafka and Zookeeper on the default ports" in {
Expand All @@ -26,15 +34,85 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {

EmbeddedKafka.stop()
}

"start and stop a specific Kafka" in {
val firstBroker = EmbeddedKafka.start()(EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001))
EmbeddedKafka.start()(EmbeddedKafkaConfig(kafkaPort = 8000, zooKeeperPort = 8001))

kafkaIsAvailable(7000)
zookeeperIsAvailable(7001)

kafkaIsAvailable(8000)
zookeeperIsAvailable(8001)

EmbeddedKafka.stop(firstBroker)

kafkaIsNotAvailable(7000)
zookeeperIsNotAvailable(7001)

kafkaIsAvailable(8000)
zookeeperIsAvailable(8001)

EmbeddedKafka.stop()
}

"start and stop multiple Kafka instances on specified ports" in {
val someConfig = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 32111)
val someBroker = EmbeddedKafka.start()(someConfig)

val someOtherConfig = EmbeddedKafkaConfig(kafkaPort = 23456, zooKeeperPort = 43211)
val someOtherBroker = EmbeddedKafka.start()(someOtherConfig)

val topic = "publish_test_topic_1"
val someOtherMessage = "another message!"

val serializer = new StringSerializer
val deserializer = new StringDeserializer

publishToKafka(topic, "hello world!")(someConfig, serializer)
publishToKafka(topic, someOtherMessage)(someOtherConfig, serializer)

kafkaIsAvailable(someConfig.kafkaPort)
EmbeddedKafka.stop(someBroker)

val anotherConsumer = kafkaConsumer(someOtherConfig, deserializer, deserializer)
anotherConsumer.subscribe(List(topic).asJava)

val moreRecords = anotherConsumer.poll(consumerPollTimeout)
moreRecords.count shouldBe 1

val someOtherRecord = moreRecords.iterator().next
someOtherRecord.value shouldBe someOtherMessage

EmbeddedKafka.stop(someOtherBroker)
}
}

"invoking the isRunnning method" should {
"return whether both Kafka and Zookeeper are running" in {
"invoking the isRunning method" should {
"return true when both Kafka and Zookeeper are running" in {
EmbeddedKafka.start()
EmbeddedKafka.isRunning shouldBe true
EmbeddedKafka.stop()
EmbeddedKafka.isRunning shouldBe false
}

"return false when only Kafka is running" in {
val unmanagedZookeeper = EmbeddedKafka.startZooKeeper(6000, Directory.makeTemp("zookeeper-test-logs"))

EmbeddedKafka.startKafka(Directory.makeTemp("kafka-test-logs"))
EmbeddedKafka.isRunning shouldBe false
EmbeddedKafka.stop()
EmbeddedKafka.isRunning shouldBe false

unmanagedZookeeper.shutdown()
}

"return false when only Zookeeper is running" in {
EmbeddedKafka.startZooKeeper(Directory.makeTemp("zookeeper-test-logs"))
EmbeddedKafka.isRunning shouldBe false
EmbeddedKafka.stop()
EmbeddedKafka.isRunning shouldBe false
}
}
}
}