Skip to content

Commit 71fba04

Browse files
author
Thomas Heslin
committed
adding concurrent test
1 parent 8d67bca commit 71fba04

File tree

1 file changed

+48
-0
lines changed

1 file changed

+48
-0
lines changed

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaObjectSpec.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
package net.manub.embeddedkafka
22

3+
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
4+
import net.manub.embeddedkafka.EmbeddedKafka._
5+
6+
import scala.collection.JavaConverters._
7+
38
class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
49

10+
val consumerPollTimeout = 5000
11+
512
"the EmbeddedKafka object" when {
613
"invoking the start and stop methods" should {
714
"start and stop Kafka and Zookeeper on the default ports" in {
@@ -26,6 +33,47 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
2633

2734
EmbeddedKafka.stop()
2835
}
36+
37+
"multiple EmbeddedKafka can run in parallel" in {
38+
val someConfig = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 32111)
39+
EmbeddedKafka.start()(someConfig)
40+
41+
val someOtherConfig = EmbeddedKafkaConfig(kafkaPort = 23456, zooKeeperPort = 43211)
42+
EmbeddedKafka.start()(someOtherConfig)
43+
44+
val topic = "publish_test_topic_1"
45+
val someMessage = "hello world!"
46+
val someOtherMessage = "another message!"
47+
48+
val serializer = new StringSerializer
49+
val deserializer = new StringDeserializer
50+
51+
publishToKafka(topic, someMessage)(someConfig, serializer)
52+
publishToKafka(topic, someOtherMessage)(someOtherConfig, serializer)
53+
54+
val consumer = kafkaConsumer(someConfig, deserializer, deserializer)
55+
consumer.subscribe(List(topic).asJava)
56+
57+
val records = consumer.poll(consumerPollTimeout)
58+
records.count shouldBe 1
59+
60+
val record = records.iterator().next
61+
record.value shouldBe someMessage
62+
63+
// second
64+
65+
val anotherConsumer = kafkaConsumer(someOtherConfig, deserializer, deserializer)
66+
anotherConsumer.subscribe(List(topic).asJava)
67+
68+
val moreRecords = anotherConsumer.poll(consumerPollTimeout)
69+
moreRecords.count shouldBe 1
70+
71+
val someOtherRecord = moreRecords.iterator().next
72+
someOtherRecord.value shouldBe someOtherMessage
73+
74+
EmbeddedKafka.stop()
75+
EmbeddedKafka.stop()
76+
}
2977
}
3078

3179
"invoking the isRunnning method" should {

0 commit comments

Comments
 (0)