Skip to content

Implement withRunningKafkaOnFoundPort #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ class MySpec extends WordSpec with EmbeddedKafka {
}
```

If you want to run ZooKeeper and Kafka on arbitrary available ports, you can
use the `withRunningKafkaOnFoundPort` method. This is useful to make tests more
reliable, especially when running tests in parallel or on machines where other
tests or services may be running with port numbers you can't control.

```scala
class MySpec extends WordSpec with EmbeddedKafka {

"runs with embedded kafka on arbitrary available ports" should {

val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
// now a kafka broker is listening on actualConfig.kafkaPort
publishStringMessageToKafka("topic", "message")
consumeFirstStringMessageFrom("topic") shouldBe "message"
}

}
```

The same implicit `EmbeddedKafkaConfig` is used to define custom consumer or producer properties

```scala
Expand All @@ -102,7 +123,7 @@ class MySpec extends WordSpec with EmbeddedKafka {
}
```

This works for both `withRunningKafka` and `EmbeddedKafka.start()`
This works for `withRunningKafka`, `withRunningKafkaOnFoundPort`, and `EmbeddedKafka.start()`

Also, it is now possible to provide custom properties to the broker while starting Kafka. `EmbeddedKafkaConfig` has a
`customBrokerProperties` field which can be used to provide extra properties contained in a `Map[String, String]`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,26 +129,64 @@ sealed trait EmbeddedKafkaSupport {
* @param body the function to execute
* @param config an implicit [[EmbeddedKafkaConfig]]
*/
def withRunningKafka(body: => Any)(
implicit config: EmbeddedKafkaConfig): Any = {

def cleanLogs(directories: Directory*): Unit = {
directories.foreach(_.deleteRecursively())
def withRunningKafka[T](body: => T)(
implicit config: EmbeddedKafkaConfig): T = {
withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
withTempDir("kafka") { kafkaLogsDir =>
val broker = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
try {
body
} finally {
broker.shutdown()
broker.awaitShutdown()
}
}
}
}

val zkLogsDir = Directory.makeTemp("zookeeper-logs")
val kafkaLogsDir = Directory.makeTemp("kafka")
/**
* Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter.
* The actual ZooKeeper and Kafka ports will be detected and inserted into a copied version of
* the EmbeddedKafkaConfig that gets passed to body. This is useful if you set either or both
* port to 0, which will listen on an arbitrary available port.
*
* @param config the user-defined [[EmbeddedKafkaConfig]]
* @param body the function to execute, given an [[EmbeddedKafkaConfig]] with the actual
* ports Kafka and ZooKeeper are running on
*/
def withRunningKafkaOnFoundPort[T](config: EmbeddedKafkaConfig)(body: EmbeddedKafkaConfig => T): T = {
withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
withTempDir("kafka") { kafkaLogsDir =>
val broker: KafkaServer = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
val kafkaPort = broker.boundPort(broker.config.listeners.head.listenerName)
val actualConfig = config.copy(kafkaPort = kafkaPort, zooKeeperPort = zkPort)
try {
body(actualConfig)
} finally {
broker.shutdown()
broker.awaitShutdown()
}
}
}
}

val factory = startZooKeeper(config.zooKeeperPort, zkLogsDir)
val broker = startKafka(config, kafkaLogsDir)
private def withRunningZooKeeper[T](port: Int)(body: Int => T): T = {
withTempDir("zookeeper-logs") { zkLogsDir =>
val factory = startZooKeeper(port, zkLogsDir)
try {
body(factory.getLocalPort)
} finally {
factory.shutdown()
}
}
}

private def withTempDir[T](prefix: String)(body: Directory => T): T = {
val dir = Directory.makeTemp(prefix)
try {
body
body(dir)
} finally {
broker.shutdown()
broker.awaitShutdown()
factory.shutdown()
cleanLogs(zkLogsDir, kafkaLogsDir)
dir.deleteRecursively()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package net.manub.embeddedkafka

class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec
extends EmbeddedKafkaSpecSupport
with EmbeddedKafka {

"the withRunningKafkaOnFoundPort method" should {
"start and stop Kafka and Zookeeper successfully on non-zero ports" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 12346)
val actualConfig = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig =>
actualConfig shouldBe userDefinedConfig
bothKafkaAndZkAreAvailable(actualConfig)
actualConfig
}
bothKafkaAndZkAreNotAvailable(actualConfig)
}

"start and stop multiple Kafka and Zookeeper successfully on arbitrary available ports" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
val actualConfig1 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig1 =>
bothKafkaAndZkAreAvailable(actualConfig1)
publishStringMessageToKafka("topic", "message1")(actualConfig1)
consumeFirstStringMessageFrom("topic")(actualConfig1) shouldBe "message1"
val actualConfig2 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig2 =>
bothKafkaAndZkAreAvailable(actualConfig2)
publishStringMessageToKafka("topic", "message2")(actualConfig2)
consumeFirstStringMessageFrom("topic")(actualConfig2) shouldBe "message2"
val allConfigs = Seq(userDefinedConfig, actualConfig1, actualConfig2)
// Confirm both actual configs are running on separate non-zero ports, but otherwise equal
allConfigs.map(_.kafkaPort).distinct should have size 3
allConfigs.map(_.zooKeeperPort).distinct should have size 3
allConfigs.map(_.copy(kafkaPort = 0, zooKeeperPort = 0)).distinct should have size 1
actualConfig2
}
bothKafkaAndZkAreNotAvailable(actualConfig2)
actualConfig1
}
bothKafkaAndZkAreNotAvailable(actualConfig1)
}

"work with a simple example using implicits" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
publishStringMessageToKafka("topic", "message")
consumeFirstStringMessageFrom("topic") shouldBe "message"
}
}
}

private def bothKafkaAndZkAreAvailable(config: EmbeddedKafkaConfig): Unit = {
kafkaIsAvailable(config.kafkaPort)
zookeeperIsAvailable(config.zooKeeperPort)
}

private def bothKafkaAndZkAreNotAvailable(config: EmbeddedKafkaConfig): Unit = {
kafkaIsNotAvailable(config.kafkaPort)
zookeeperIsNotAvailable(config.zooKeeperPort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ abstract class EmbeddedKafkaSpecSupport
expectMsg(1 second, ConnectionSuccessful)
}

def kafkaIsNotAvailable(): Unit = {
def kafkaIsNotAvailable(kafkaPort: Int = 6001): Unit = {
system.actorOf(
TcpClient.props(new InetSocketAddress("localhost", 6001), testActor))
TcpClient.props(new InetSocketAddress("localhost", kafkaPort), testActor))
expectMsg(1 second, ConnectionFailed)
}

def zookeeperIsNotAvailable(): Unit = {
def zookeeperIsNotAvailable(zookeeperPort: Int = 6000): Unit = {
system.actorOf(
TcpClient.props(new InetSocketAddress("localhost", 6000), testActor))
TcpClient.props(new InetSocketAddress("localhost", zookeeperPort), testActor))
expectMsg(1 second, ConnectionFailed)
}
}
Expand Down