Skip to content

Commit cd83bab

Browse files
author
Alex Adriaanse
committed
Allow consumeFirstStringMessageFrom to be called multiple times
In older versions of scalatest-embedded-kafka you were able to call consumeFirstStringMessageFrom multiple times, with each invocation causing the next message in a topic to be consumed. In the latest version this is no longer the case: the first invocation works fine, but subsequent invocations would result in exceptions as the first invocation would cause *all* messages to be committed even though only the first one is returned. We fix this by disabling auto-commit and simply committing only the message that consumeFirstStringMessageFrom returns.
1 parent 4925ecd commit cd83bab

File tree

2 files changed

+43
-3
lines changed

2 files changed

+43
-3
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import java.util.concurrent.Executors
77
import kafka.admin.AdminUtils
88
import kafka.server.{KafkaConfig, KafkaServer}
99
import kafka.utils.ZkUtils
10-
import org.apache.kafka.clients.consumer.KafkaConsumer
10+
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
1111
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
12-
import org.apache.kafka.common.KafkaException
12+
import org.apache.kafka.common.{KafkaException, TopicPartition}
1313
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
1414
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1515
import org.scalatest.Suite
@@ -236,6 +236,7 @@ sealed trait EmbeddedKafkaSupport {
236236
props.put("group.id", s"embedded-kafka-spec")
237237
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
238238
props.put("auto.offset.reset", "earliest")
239+
props.put("enable.auto.commit", "false")
239240

240241
val consumer =
241242
new KafkaConsumer[String, T](props, new StringDeserializer, deserializer)
@@ -248,7 +249,14 @@ sealed trait EmbeddedKafkaSupport {
248249
throw new TimeoutException(
249250
"Unable to retrieve a message from Kafka in 5000ms")
250251
}
251-
records.iterator().next().value()
252+
253+
val record = records.iterator().next()
254+
255+
val tp = new TopicPartition(record.topic(), record.partition())
256+
val om = new OffsetAndMetadata(record.offset() + 1)
257+
consumer.commitSync(Map(tp -> om))
258+
259+
record.value()
252260
}
253261

254262
consumer.close()

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
4444

4545
consumer.close()
4646

47+
// Commit message in embedded-kafka-spec group so we don't consume it
48+
// again in the consumeFirstStringMessageFrom tests below.
49+
consumeFirstStringMessageFrom(topic)
4750
}
4851

4952
"publish synchronously a String message with String key to Kafka" in {
@@ -67,6 +70,10 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
6770

6871

6972
consumer.close()
73+
74+
// Commit message in embedded-kafka-spec group so we don't consume it
75+
// again in the consumeFirstStringMessageFrom tests below.
76+
consumeFirstStringMessageFrom(topic)
7077
}
7178
}
7279

@@ -126,6 +133,31 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
126133
producer.close()
127134
}
128135

136+
"consume only a single message when multiple messages have been published to a topic" in {
137+
val messages = Set("message 1", "message 2", "message 3")
138+
val topic = "test_topic"
139+
140+
val producer = new KafkaProducer[String, String](Map(
141+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
142+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
143+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
144+
))
145+
146+
messages.foreach { message =>
147+
producer.send(new ProducerRecord[String, String](topic, message))
148+
}
149+
150+
producer.flush()
151+
152+
val consumedMessages = for (i <- 1 to messages.size) yield {
153+
consumeFirstStringMessageFrom(topic)
154+
}
155+
156+
consumedMessages.toSet shouldEqual messages
157+
158+
producer.close()
159+
}
160+
129161
"return a message published to a topic with implicit decoder" in {
130162
val message = "hello world!"
131163
val topic = "test_topic"

0 commit comments

Comments
 (0)