Skip to content

Commit cd2fc4a

Browse files
authored
Refactored autoCommit for consumeFirstMessage methods. (#62)
* Refactored autoCommit for consumeFirstMessage methods. * Removed unused autoCommit field in EmbeddedKafkaConfig.
1 parent fb852e6 commit cd2fc4a

File tree

3 files changed

+11
-8
lines changed

3 files changed

+11
-8
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ This works for both `withRunningKafka` and `EmbeddedKafka.start()`
7676
Also, it is now possible to provide custom properties to the broker while starting Kafka. `EmbeddedKafkaConfig` has a
7777
`customBrokerProperties` field which can be used to provide extra properties contained in a `Map[String, String]`.
7878
Those properties will be added to the broker configuration, be careful some properties are set by the library itself and
79-
in case of conflict your values will take precedence. Please look at the source code to see what these properties
79+
in case of conflict the `customBrokerProperties` values will take precedence. Please look at the source code to see what these properties
8080
are.
8181

8282
## Utility methods

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,16 +210,20 @@ sealed trait EmbeddedKafkaSupport {
210210
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
211211
)
212212

213-
def consumeFirstStringMessageFrom(topic: String)(
213+
def consumeFirstStringMessageFrom(topic: String, autoCommit: Boolean = false)(
214214
implicit config: EmbeddedKafkaConfig): String =
215-
consumeFirstMessageFrom(topic)(config, new StringDeserializer())
215+
consumeFirstMessageFrom(topic, autoCommit)(config, new StringDeserializer())
216216

217217
/**
218218
* Consumes the first message available in a given topic, deserializing it as a String.
219219
*
220-
* Only the messsage that is returned is committed if config.autoCommit is false. If config.autoCommit is true then all messages that were polled will be committed.
220+
* Only the messsage that is returned is committed if autoCommit is false.
221+
* If autoCommit is true then all messages that were polled will be committed.
221222
*
222223
* @param topic the topic to consume a message from
224+
* @param autoCommit if false, only the offset for the consumed message will be commited.
225+
* if true, the offset for the last polled message will be committed instead.
226+
* Defaulted to false.
223227
* @param config an implicit [[EmbeddedKafkaConfig]]
224228
* @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[T]]
225229
* @return the first message consumed from the given topic, with a type [[T]]
@@ -228,7 +232,7 @@ sealed trait EmbeddedKafkaSupport {
228232
*/
229233
@throws(classOf[TimeoutException])
230234
@throws(classOf[KafkaUnavailableException])
231-
def consumeFirstMessageFrom[T](topic: String)(
235+
def consumeFirstMessageFrom[T](topic: String, autoCommit: Boolean = false)(
232236
implicit config: EmbeddedKafkaConfig,
233237
deserializer: Deserializer[T]): T = {
234238

@@ -238,7 +242,7 @@ sealed trait EmbeddedKafkaSupport {
238242
props.put("group.id", s"embedded-kafka-spec")
239243
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
240244
props.put("auto.offset.reset", "earliest")
241-
props.put("enable.auto.commit", s"${config.autoCommit}")
245+
props.put("enable.auto.commit", autoCommit.toString)
242246

243247
val consumer =
244248
new KafkaConsumer[String, T](props, new StringDeserializer, deserializer)

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ package net.manub.embeddedkafka
22

33
case class EmbeddedKafkaConfig(kafkaPort: Int = 6001,
44
zooKeeperPort: Int = 6000,
5-
customBrokerProperties: Map[String, String] = Map.empty,
6-
autoCommit: Boolean = false)
5+
customBrokerProperties: Map[String, String] = Map.empty)
76

87
object EmbeddedKafkaConfig {
98
implicit val defaultConfig = EmbeddedKafkaConfig()

0 commit comments

Comments
 (0)