Skip to content

Concurrent kafka instances #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 9, 2018
Merged

Conversation

tjheslin1
Copy link
Contributor

@tjheslin1 tjheslin1 commented Feb 24, 2018

I'm raising this PR now in the hopes that you can take a look at my approach. If you have any major concerns it would be best to find out sooner!

Closes #101

TODO:

  • @Shashidesai's query around runStreams
  • implicit conversion extension method to go from Seq[EmbeddedServer] => Seq[EmbeddedZ] or Seq[EmbeddedK], replacing the boiler place isInstanceOf + asInstanceOf
  • clean up of new tests for conciseness
  • javadoc for EmbeddedServer.scala

Problem

Running tests in parallel which use EmbeddedKafka can run into an issue with the log files as EmbeddedKafka#stop clears all temporary log directories, which another instance of kafka may still be relying on in another test.

To see this error I changed logback.xml line 1 to: log4j.rootLogger=error,stdout:

FATAL [Replica Manager on Broker 0]: Error writing to highwatermark file ... java.io.FileNotFoundException ... kafka-logsXXXXXXXXX.tmp/replication-offset-checkpoint.tmp (No such file or directory)

Testing

I would like to improve these as there is quite a lot of repetition

Solution

I have created a model of the running servers, both Kafka and Zookeeper. Starting one of these appends an EmbeddedServer to managed list of servers.

The following functions have had their signatures modified. Given that they all used to return Unit, this results in a non-breaking change to users. On upgrading the returned objects will be ignored and the existing functionality will be preserved.

EmbeddedKafka#start now returns an EmbeddedServer.

EmbeddedKafka#startZooKeeper now returns an EmbeddedZ.

EmbeddedKafka#startKafka now returns an EmbeddedK.

I have added an overload to EmbeddedKafka#stop which will stop the specific server and remove it from the list of managed servers.

Thomas Heslin added 3 commits February 18, 2018 20:06
…in case class instances. The start methods, which used to return Unit, now return references to the base type: EmbeddedServer. These references can be used to stop the specific servers using a new overload stop function.
@manub
Copy link
Owner

manub commented Feb 26, 2018

Thanks @tjheslin1 - the approach looks good to me. Let me know when you've finished and I'll carry on with the review.

… kafka functionality after first has shutdown
@tjheslin1 tjheslin1 changed the title [Work in progress] Concurrent kafka instances Concurrent kafka instances Mar 5, 2018
@tjheslin1
Copy link
Contributor Author

Ready to be reviewed.

@manub
Copy link
Owner

manub commented Mar 5, 2018

All looks good - could you please address the 2 minor things that are still pending?

@tjheslin1
Copy link
Contributor Author

Ready now @manub. @Shashidesai will be raising another issue.

@manub manub merged commit 8e4155d into manub:master Mar 9, 2018
@manub
Copy link
Owner

manub commented Mar 9, 2018

Thanks. Will merge now and release in the next days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants