Skip to content

Migration to kafka 1.0.0 #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ consumer.consumeLazily("from-this-topic").take(3).toList should be (Seq(
## scalatest-embedded-kafka-streams

A library that builds on top of `scalatest-embedded-kafka` to offer easy testing of [Kafka Streams](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams) with ScalaTest.
It uses Kafka Streams 0.11.0.0.
It uses Kafka Streams 1.0.0.

It takes care of instantiating and starting your streams as well as closing them after running your test-case code.

Expand All @@ -190,13 +190,13 @@ It takes care of instantiating and starting your streams as well as closing them
* If you only want to use the streams management without the test consumers just have the `Spec` extend the `EmbeddedKafkaStreams` trait.
* Use the `runStreamsWithStringConsumer` to:
* Create any topics that need to exist for the streams to operate (usually sources and sinks).
* Pass the Stream or Topology builder that will then be used to instantiate and start the Kafka Streams. This will be done while using the `withRunningKafka` closure internally so that your stream runs with an embedded Kafka and Zookeeper.
* Pass the Topology that will be used to instantiate and start the Kafka Streams. This will be done while using the `withRunningKafka` closure internally so that your stream runs with an embedded Kafka and Zookeeper.
* Pass the `{code block}` that needs a running instance of your streams. This is where your actual test code will sit. You can publish messages to your source topics and consume messages from your sink topics that the Kafka Streams should have generated. This method also offers a pre-instantiated consumer that can read String keys and values.
* For more flexibility, use `runStreams` and `withConsumer`. This allows you to create your own consumers of custom types as seen in the [example test](kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala).

```scala
import net.manub.embeddedkafka.ConsumerExtensions._
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.StreamsBuilder
import org.scalatest.{Matchers, WordSpec}

class MySpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
Expand All @@ -205,14 +205,14 @@ class MySpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
val inputTopic = "input-topic"
val outputTopic = "output-topic"
// your code for building the stream goes here e.g.
val streamBuilder = new KStreamBuilder
val streamBuilder = new StreamsBuilder
streamBuilder.stream(inputTopic).to(outputTopic)
// tell the stream test
// 1. what topics need to be created before the stream starts
// 2. the builder to be used for initializing and starting the stream
// 2. the stream topology to be used for initializing and starting the stream
runStreamsWithStringConsumer(
topicsToCreate = Seq(inputTopic, outputTopic),
builder = streamBuilder
topology = streamBuilder.build()
){ consumer =>
// your test code goes here
publishToKafka(inputTopic, key = "hello", message = "world")
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import sbtrelease.Version

parallelExecution in ThisBuild := false

val kafkaVersion = "0.11.0.1"
val kafkaVersion = "1.0.0"
val zookeeperVersion = "3.4.10"
val akkaVersion = "2.4.20"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.apache.kafka.common.serialization.{
import org.scalatest.BeforeAndAfterAll

import scala.collection.JavaConverters._
import org.scalatest.OptionValues._

class EmbeddedKafkaMethodsSpec
extends EmbeddedKafkaSpecSupport
Expand Down Expand Up @@ -153,10 +154,9 @@ class EmbeddedKafkaMethodsSpec
zkConnectionTimeoutMs,
zkSecurityEnabled)
try {
AdminUtils
.fetchTopicMetadataFromZk(topic, zkUtils)
.partitionMetadata()
.size shouldBe 2
zkUtils
.getTopicPartitionCount(topic)
.value shouldBe 2
} finally zkUtils.close()

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package net.manub.embeddedkafka.streams

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig, UUIDs}
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.processor.TopologyBuilder
import org.apache.kafka.streams.{KafkaStreams, Topology}
import org.apache.log4j.Logger
import org.scalatest.Suite

Expand All @@ -24,29 +23,30 @@ trait EmbeddedKafkaStreams extends EmbeddedKafka with TestStreamsConfig {
* e.g.
*
* {{{
*runStreams(Seq("inputTopic", "outputTopic", streamBuilder) {
*runStreams(Seq("inputTopic", "outputTopic", topology) {
* // here you can publish and consume messages and make assertions
* publishToKafka(in, Seq("one-string", "another-string"))
* consumeFirstStringMessageFrom(in) should be ("one-string")
*}
* }}}
*
* @param topicsToCreate the topics that should be created in Kafka before launching the streams.
* @param builder the streams builder that will be used to instantiate the streams with
* @param topology the streams topology that will be used to instantiate the streams with
* a default configuration (all state directories are different and
* in temp folders)
* @param extraConfig additional KafkaStreams configuration (overwrite existing keys in
* default config)
* @param block the code block that will executed while the streams are active.
* Once the block has been executed the streams will be closed.
*/
def runStreams(topicsToCreate: Seq[String], builder: TopologyBuilder, extraConfig: Map[String, AnyRef] = Map.empty)(
block: => Any)(implicit config: EmbeddedKafkaConfig): Any =
def runStreams(topicsToCreate: Seq[String], topology: Topology, extraConfig: Map[String, AnyRef] = Map.empty)
(block: => Any)
(implicit config: EmbeddedKafkaConfig): Any =
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId = UUIDs.newUuid().toString
logger.debug(s"Creating stream with Application ID: [$streamId]")
val streams = new KafkaStreams(builder, streamConfig(streamId, extraConfig))
val streams = new KafkaStreams(topology, streamConfig(streamId, extraConfig))
streams.start()
try {
block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package net.manub.embeddedkafka.streams

import net.manub.embeddedkafka.{Consumers, EmbeddedKafkaConfig}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.streams.processor.TopologyBuilder
import org.apache.kafka.streams.Topology
import org.scalatest.Suite

/** Convenience trait for testing Kafka Streams with ScalaTest.
Expand All @@ -11,7 +11,7 @@ import org.scalatest.Suite
*
* e.g.
* {{{
*runStreams(Seq("inputTopic", "outputTopic", streamBuilder) {
*runStreams(Seq("inputTopic", "outputTopic", streamTopology) {
* withConsumer[String, String, Unit] { consumer =>
* // here you can publish and consume messages and make assertions
* publishToKafka(in, Seq("one-string", "another-string"))
Expand All @@ -35,14 +35,13 @@ trait EmbeddedKafkaStreamsAllInOne
* that the Streams-under-test use for inputs and outputs. They need to be
* created before running the streams and
* this is automatically taken care of.
* @param builder the streams builder that contains the stream topology that will be instantiated
* @param topology the streams topology that will be instantiated
* @param block the block of testing code that will be executed by passing the simple
* String-based consumer.
* @return the result of the testing code
*/
def runStreamsWithStringConsumer(
topicsToCreate: Seq[String],
builder: TopologyBuilder)(block: KafkaConsumer[String, String] => Any)(
implicit config: EmbeddedKafkaConfig): Any =
runStreams(topicsToCreate, builder)(withStringConsumer[Any](block))(config)
def runStreamsWithStringConsumer(topicsToCreate: Seq[String], topology: Topology)
(block: KafkaConsumer[String, String] => Any)
(implicit config: EmbeddedKafkaConfig): Any =
runStreams(topicsToCreate, topology)(withStringConsumer[Any](block))(config)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.ConsumerExtensions._
import net.manub.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}
import org.apache.kafka.streams.{Consumed, StreamsBuilder}
import org.apache.kafka.streams.kstream.{KStream, Produced}
import org.scalatest.{Matchers, WordSpec}

class ExampleKafkaStreamsSpec
Expand All @@ -21,13 +22,13 @@ class ExampleKafkaStreamsSpec

"A Kafka streams test" should {
"be easy to run with streams and consumer lifecycle management" in {
val streamBuilder = new KStreamBuilder
val streamBuilder = new StreamsBuilder
val stream: KStream[String, String] =
streamBuilder.stream(stringSerde, stringSerde, inTopic)
streamBuilder.stream(inTopic, Consumed.`with`(stringSerde, stringSerde))

stream.to(stringSerde, stringSerde, outTopic)
stream.to(outTopic, Produced.`with`(stringSerde, stringSerde))

runStreams(Seq(inTopic, outTopic), streamBuilder) {
runStreams(Seq(inTopic, outTopic), streamBuilder.build()) {
publishToKafka(inTopic, "hello", "world")
publishToKafka(inTopic, "foo", "bar")
publishToKafka(inTopic, "baz", "yaz")
Expand All @@ -42,13 +43,13 @@ class ExampleKafkaStreamsSpec
}

"allow support creating custom consumers" in {
val streamBuilder = new KStreamBuilder
val streamBuilder = new StreamsBuilder
val stream: KStream[String, String] =
streamBuilder.stream(stringSerde, stringSerde, inTopic)
streamBuilder.stream(inTopic, Consumed.`with`(stringSerde, stringSerde))

stream.to(stringSerde, stringSerde, outTopic)
stream.to(outTopic, Produced.`with`(stringSerde, stringSerde))

runStreams(Seq(inTopic, outTopic), streamBuilder) {
runStreams(Seq(inTopic, outTopic), streamBuilder.build()) {
publishToKafka(inTopic, "hello", "world")
publishToKafka(inTopic, "foo", "bar")
val consumer = newConsumer[String, String]()
Expand All @@ -59,17 +60,17 @@ class ExampleKafkaStreamsSpec
}

"allow for easy string based testing" in {
val streamBuilder = new KStreamBuilder
val streamBuilder = new StreamsBuilder
val stream: KStream[String, String] =
streamBuilder.stream(stringSerde, stringSerde, inTopic)
streamBuilder.stream(inTopic, Consumed.`with`(stringSerde, stringSerde))

stream.to(stringSerde, stringSerde, outTopic)
stream.to(outTopic, Produced.`with`(stringSerde, stringSerde))

runStreamsWithStringConsumer(Seq(inTopic, outTopic), streamBuilder) {
runStreamsWithStringConsumer(Seq(inTopic, outTopic), streamBuilder.build()) {
consumer =>
publishToKafka(inTopic, "hello", "world")
consumer.consumeLazily(outTopic).head should be("hello" -> "world")
}
}
}
}
}