Skip to content

Commit 119e911

Browse files
msaunier-poctumanub
authored andcommitted
Add possibility to override streams config in runStreams method #68 (#94)
1 parent 7884e82 commit 119e911

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/EmbeddedKafkaStreams.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,18 @@ trait EmbeddedKafkaStreams extends EmbeddedKafka with TestStreamsConfig {
3535
* @param builder the streams builder that will be used to instantiate the streams with
3636
* a default configuration (all state directories are different and
3737
* in temp folders)
38+
* @param extraConfig additional KafkaStreams configuration (overwrite existing keys in
39+
* default config)
3840
* @param block the code block that will executed while the streams are active.
3941
* Once the block has been executed the streams will be closed.
4042
*/
41-
def runStreams(topicsToCreate: Seq[String], builder: TopologyBuilder)(
43+
def runStreams(topicsToCreate: Seq[String], builder: TopologyBuilder, extraConfig: Map[String, AnyRef] = Map.empty)(
4244
block: => Any)(implicit config: EmbeddedKafkaConfig): Any =
4345
withRunningKafka {
4446
topicsToCreate.foreach(topic => createCustomTopic(topic))
4547
val streamId = UUIDs.newUuid().toString
4648
logger.debug(s"Creating stream with Application ID: [$streamId]")
47-
val streams = new KafkaStreams(builder, streamConfig(streamId))
49+
val streams = new KafkaStreams(builder, streamConfig(streamId, extraConfig))
4850
streams.start()
4951
try {
5052
block

0 commit comments

Comments
 (0)