Skip to content

Add possibility to override streams config in runStreams method (issue #68) #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 3, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ trait EmbeddedKafkaStreams extends EmbeddedKafka with TestStreamsConfig {
* @param builder the streams builder that will be used to instantiate the streams with
* a default configuration (all state directories are different and
* in temp folders)
* @param extraConfig additional KafkaStreams configuration (overwrite existing keys in
* default config)
* @param block the code block that will executed while the streams are active.
* Once the block has been executed the streams will be closed.
*/
def runStreams(topicsToCreate: Seq[String], builder: TopologyBuilder)(
def runStreams(topicsToCreate: Seq[String], builder: TopologyBuilder, extraConfig: Map[String, AnyRef] = Map.empty)(
block: => Any)(implicit config: EmbeddedKafkaConfig): Any =
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId = UUIDs.newUuid().toString
logger.debug(s"Creating stream with Application ID: [$streamId]")
val streams = new KafkaStreams(builder, streamConfig(streamId))
val streams = new KafkaStreams(builder, streamConfig(streamId, extraConfig))
streams.start()
try {
block
Expand Down