Skip to content

Commit 855a54a

Browse files
azhurmanub
authored andcommitted
Migration to kafka 1.0.0 (#98)
* Migration to kafka 1.0.0 * styling * updated readme
1 parent 661aaab commit 855a54a

File tree

6 files changed

+40
-40
lines changed

6 files changed

+40
-40
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ consumer.consumeLazily("from-this-topic").take(3).toList should be (Seq(
178178
## scalatest-embedded-kafka-streams
179179

180180
A library that builds on top of `scalatest-embedded-kafka` to offer easy testing of [Kafka Streams](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams) with ScalaTest.
181-
It uses Kafka Streams 0.11.0.0.
181+
It uses Kafka Streams 1.0.0.
182182

183183
It takes care of instantiating and starting your streams as well as closing them after running your test-case code.
184184

@@ -190,13 +190,13 @@ It takes care of instantiating and starting your streams as well as closing them
190190
* If you only want to use the streams management without the test consumers just have the `Spec` extend the `EmbeddedKafkaStreams` trait.
191191
* Use the `runStreamsWithStringConsumer` to:
192192
* Create any topics that need to exist for the streams to operate (usually sources and sinks).
193-
* Pass the Stream or Topology builder that will then be used to instantiate and start the Kafka Streams. This will be done while using the `withRunningKafka` closure internally so that your stream runs with an embedded Kafka and Zookeeper.
193+
* Pass the Topology that will be used to instantiate and start the Kafka Streams. This will be done while using the `withRunningKafka` closure internally so that your stream runs with an embedded Kafka and Zookeeper.
194194
* Pass the `{code block}` that needs a running instance of your streams. This is where your actual test code will sit. You can publish messages to your source topics and consume messages from your sink topics that the Kafka Streams should have generated. This method also offers a pre-instantiated consumer that can read String keys and values.
195195
* For more flexibility, use `runStreams` and `withConsumer`. This allows you to create your own consumers of custom types as seen in the [example test](kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala).
196196

197197
```scala
198198
import net.manub.embeddedkafka.ConsumerExtensions._
199-
import org.apache.kafka.streams.kstream.KStreamBuilder
199+
import org.apache.kafka.streams.StreamsBuilder
200200
import org.scalatest.{Matchers, WordSpec}
201201

202202
class MySpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
@@ -205,14 +205,14 @@ class MySpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
205205
val inputTopic = "input-topic"
206206
val outputTopic = "output-topic"
207207
// your code for building the stream goes here e.g.
208-
val streamBuilder = new KStreamBuilder
208+
val streamBuilder = new StreamsBuilder
209209
streamBuilder.stream(inputTopic).to(outputTopic)
210210
// tell the stream test
211211
// 1. what topics need to be created before the stream starts
212-
// 2. the builder to be used for initializing and starting the stream
212+
// 2. the stream topology to be used for initializing and starting the stream
213213
runStreamsWithStringConsumer(
214214
topicsToCreate = Seq(inputTopic, outputTopic),
215-
builder = streamBuilder
215+
topology = streamBuilder.build()
216216
){ consumer =>
217217
// your test code goes here
218218
publishToKafka(inputTopic, key = "hello", message = "world")

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import sbtrelease.Version
22

33
parallelExecution in ThisBuild := false
44

5-
val kafkaVersion = "0.11.0.1"
5+
val kafkaVersion = "1.0.0"
66
val zookeeperVersion = "3.4.10"
77
val akkaVersion = "2.4.20"
88

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import org.apache.kafka.common.serialization.{
1717
import org.scalatest.BeforeAndAfterAll
1818

1919
import scala.collection.JavaConverters._
20+
import org.scalatest.OptionValues._
2021

2122
class EmbeddedKafkaMethodsSpec
2223
extends EmbeddedKafkaSpecSupport
@@ -153,10 +154,9 @@ class EmbeddedKafkaMethodsSpec
153154
zkConnectionTimeoutMs,
154155
zkSecurityEnabled)
155156
try {
156-
AdminUtils
157-
.fetchTopicMetadataFromZk(topic, zkUtils)
158-
.partitionMetadata()
159-
.size shouldBe 2
157+
zkUtils
158+
.getTopicPartitionCount(topic)
159+
.value shouldBe 2
160160
} finally zkUtils.close()
161161

162162
}

kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/EmbeddedKafkaStreams.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package net.manub.embeddedkafka.streams
22

33
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig, UUIDs}
4-
import org.apache.kafka.streams.KafkaStreams
5-
import org.apache.kafka.streams.processor.TopologyBuilder
4+
import org.apache.kafka.streams.{KafkaStreams, Topology}
65
import org.apache.log4j.Logger
76
import org.scalatest.Suite
87

@@ -24,29 +23,30 @@ trait EmbeddedKafkaStreams extends EmbeddedKafka with TestStreamsConfig {
2423
* e.g.
2524
*
2625
* {{{
27-
*runStreams(Seq("inputTopic", "outputTopic", streamBuilder) {
26+
*runStreams(Seq("inputTopic", "outputTopic", topology) {
2827
* // here you can publish and consume messages and make assertions
2928
* publishToKafka(in, Seq("one-string", "another-string"))
3029
* consumeFirstStringMessageFrom(in) should be ("one-string")
3130
*}
3231
* }}}
3332
*
3433
* @param topicsToCreate the topics that should be created in Kafka before launching the streams.
35-
* @param builder the streams builder that will be used to instantiate the streams with
34+
* @param topology the streams topology that will be used to instantiate the streams with
3635
* a default configuration (all state directories are different and
3736
* in temp folders)
3837
* @param extraConfig additional KafkaStreams configuration (overwrite existing keys in
3938
* default config)
4039
* @param block the code block that will executed while the streams are active.
4140
* Once the block has been executed the streams will be closed.
4241
*/
43-
def runStreams(topicsToCreate: Seq[String], builder: TopologyBuilder, extraConfig: Map[String, AnyRef] = Map.empty)(
44-
block: => Any)(implicit config: EmbeddedKafkaConfig): Any =
42+
def runStreams(topicsToCreate: Seq[String], topology: Topology, extraConfig: Map[String, AnyRef] = Map.empty)
43+
(block: => Any)
44+
(implicit config: EmbeddedKafkaConfig): Any =
4545
withRunningKafka {
4646
topicsToCreate.foreach(topic => createCustomTopic(topic))
4747
val streamId = UUIDs.newUuid().toString
4848
logger.debug(s"Creating stream with Application ID: [$streamId]")
49-
val streams = new KafkaStreams(builder, streamConfig(streamId, extraConfig))
49+
val streams = new KafkaStreams(topology, streamConfig(streamId, extraConfig))
5050
streams.start()
5151
try {
5252
block

kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/EmbeddedKafkaStreamsAllInOne.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package net.manub.embeddedkafka.streams
22

33
import net.manub.embeddedkafka.{Consumers, EmbeddedKafkaConfig}
44
import org.apache.kafka.clients.consumer.KafkaConsumer
5-
import org.apache.kafka.streams.processor.TopologyBuilder
5+
import org.apache.kafka.streams.Topology
66
import org.scalatest.Suite
77

88
/** Convenience trait for testing Kafka Streams with ScalaTest.
@@ -11,7 +11,7 @@ import org.scalatest.Suite
1111
*
1212
* e.g.
1313
* {{{
14-
*runStreams(Seq("inputTopic", "outputTopic", streamBuilder) {
14+
*runStreams(Seq("inputTopic", "outputTopic", streamTopology) {
1515
* withConsumer[String, String, Unit] { consumer =>
1616
* // here you can publish and consume messages and make assertions
1717
* publishToKafka(in, Seq("one-string", "another-string"))
@@ -35,14 +35,13 @@ trait EmbeddedKafkaStreamsAllInOne
3535
* that the Streams-under-test use for inputs and outputs. They need to be
3636
* created before running the streams and
3737
* this is automatically taken care of.
38-
* @param builder the streams builder that contains the stream topology that will be instantiated
38+
* @param topology the streams topology that will be instantiated
3939
* @param block the block of testing code that will be executed by passing the simple
4040
* String-based consumer.
4141
* @return the result of the testing code
4242
*/
43-
def runStreamsWithStringConsumer(
44-
topicsToCreate: Seq[String],
45-
builder: TopologyBuilder)(block: KafkaConsumer[String, String] => Any)(
46-
implicit config: EmbeddedKafkaConfig): Any =
47-
runStreams(topicsToCreate, builder)(withStringConsumer[Any](block))(config)
43+
def runStreamsWithStringConsumer(topicsToCreate: Seq[String], topology: Topology)
44+
(block: KafkaConsumer[String, String] => Any)
45+
(implicit config: EmbeddedKafkaConfig): Any =
46+
runStreams(topicsToCreate, topology)(withStringConsumer[Any](block))(config)
4847
}

kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import net.manub.embeddedkafka.Codecs._
44
import net.manub.embeddedkafka.ConsumerExtensions._
55
import net.manub.embeddedkafka.EmbeddedKafkaConfig
66
import org.apache.kafka.common.serialization.{Serde, Serdes}
7-
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}
7+
import org.apache.kafka.streams.{Consumed, StreamsBuilder}
8+
import org.apache.kafka.streams.kstream.{KStream, Produced}
89
import org.scalatest.{Matchers, WordSpec}
910

1011
class ExampleKafkaStreamsSpec
@@ -21,13 +22,13 @@ class ExampleKafkaStreamsSpec
2122

2223
"A Kafka streams test" should {
2324
"be easy to run with streams and consumer lifecycle management" in {
24-
val streamBuilder = new KStreamBuilder
25+
val streamBuilder = new StreamsBuilder
2526
val stream: KStream[String, String] =
26-
streamBuilder.stream(stringSerde, stringSerde, inTopic)
27+
streamBuilder.stream(inTopic, Consumed.`with`(stringSerde, stringSerde))
2728

28-
stream.to(stringSerde, stringSerde, outTopic)
29+
stream.to(outTopic, Produced.`with`(stringSerde, stringSerde))
2930

30-
runStreams(Seq(inTopic, outTopic), streamBuilder) {
31+
runStreams(Seq(inTopic, outTopic), streamBuilder.build()) {
3132
publishToKafka(inTopic, "hello", "world")
3233
publishToKafka(inTopic, "foo", "bar")
3334
publishToKafka(inTopic, "baz", "yaz")
@@ -42,13 +43,13 @@ class ExampleKafkaStreamsSpec
4243
}
4344

4445
"allow support creating custom consumers" in {
45-
val streamBuilder = new KStreamBuilder
46+
val streamBuilder = new StreamsBuilder
4647
val stream: KStream[String, String] =
47-
streamBuilder.stream(stringSerde, stringSerde, inTopic)
48+
streamBuilder.stream(inTopic, Consumed.`with`(stringSerde, stringSerde))
4849

49-
stream.to(stringSerde, stringSerde, outTopic)
50+
stream.to(outTopic, Produced.`with`(stringSerde, stringSerde))
5051

51-
runStreams(Seq(inTopic, outTopic), streamBuilder) {
52+
runStreams(Seq(inTopic, outTopic), streamBuilder.build()) {
5253
publishToKafka(inTopic, "hello", "world")
5354
publishToKafka(inTopic, "foo", "bar")
5455
val consumer = newConsumer[String, String]()
@@ -59,17 +60,17 @@ class ExampleKafkaStreamsSpec
5960
}
6061

6162
"allow for easy string based testing" in {
62-
val streamBuilder = new KStreamBuilder
63+
val streamBuilder = new StreamsBuilder
6364
val stream: KStream[String, String] =
64-
streamBuilder.stream(stringSerde, stringSerde, inTopic)
65+
streamBuilder.stream(inTopic, Consumed.`with`(stringSerde, stringSerde))
6566

66-
stream.to(stringSerde, stringSerde, outTopic)
67+
stream.to(outTopic, Produced.`with`(stringSerde, stringSerde))
6768

68-
runStreamsWithStringConsumer(Seq(inTopic, outTopic), streamBuilder) {
69+
runStreamsWithStringConsumer(Seq(inTopic, outTopic), streamBuilder.build()) {
6970
consumer =>
7071
publishToKafka(inTopic, "hello", "world")
7172
consumer.consumeLazily(outTopic).head should be("hello" -> "world")
7273
}
7374
}
7475
}
75-
}
76+
}

0 commit comments

Comments
 (0)