Concurrent kafka instances #105
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
I'm raising this PR now in the hopes that you can take a look at my approach. If you have any major concerns it would be best to find out sooner!
Closes #101
TODO:
runStreams
implicit conversionextension method to go fromSeq[EmbeddedServer]
=>Seq[EmbeddedZ]
orSeq[EmbeddedK]
, replacing the boiler placeisInstanceOf
+asInstanceOf
EmbeddedServer.scala
Problem
Running tests in parallel which use
EmbeddedKafka
can run into an issue with the log files asEmbeddedKafka#stop
clears all temporary log directories, which another instance of kafka may still be relying on in another test.To see this error I changed
logback.xml
line 1 to:log4j.rootLogger=error,stdout
:FATAL [Replica Manager on Broker 0]: Error writing to highwatermark file
...java.io.FileNotFoundException
...kafka-logsXXXXXXXXX.tmp/replication-offset-checkpoint.tmp (No such file or directory)
Testing
I would like to improve these as there is quite a lot of repetition
Solution
I have created a model of the running servers, both Kafka and Zookeeper. Starting one of these appends an
EmbeddedServer
to managed list ofservers
.The following functions have had their signatures modified. Given that they all used to return
Unit
, this results in a non-breaking change to users. On upgrading the returned objects will be ignored and the existing functionality will be preserved.EmbeddedKafka#start now returns an
EmbeddedServer
.EmbeddedKafka#startZooKeeper now returns an
EmbeddedZ
.EmbeddedKafka#startKafka now returns an
EmbeddedK
.I have added an overload to EmbeddedKafka#stop which will stop the specific server and remove it from the list of managed servers.