Skip to content

Commit 36aa1d2

Browse files
committed
increase flexibility of ConsumerOps
1 parent c99ca9b commit 36aa1d2

File tree

4 files changed

+62
-47
lines changed

4 files changed

+62
-47
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/Codecs.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package net.manub.embeddedkafka
22

33
import kafka.serializer._
4+
import org.apache.kafka.clients.consumer.ConsumerRecord
45
import org.apache.kafka.common.serialization._
56

67
/** useful encoders/serializers and decoders/deserializers **/
@@ -17,4 +18,16 @@ object Codecs {
1718
new StringDeserializer()
1819
implicit val nullDeserializer: Deserializer[Array[Byte]] =
1920
new ByteArrayDeserializer()
21+
22+
implicit val stringKeyValueCrDecoder
23+
: ConsumerRecord[String, String] => (String, String) =
24+
cr => (cr.key(), cr.value)
25+
implicit val stringValueCrDecoder: ConsumerRecord[String, String] => String =
26+
_.value()
27+
28+
implicit val keyNullValueCrDecoder
29+
: ConsumerRecord[String, Array[Byte]] => (String, Array[Byte]) =
30+
cr => (cr.key(), cr.value)
31+
implicit val nullValueCrDecoder
32+
: ConsumerRecord[String, Array[Byte]] => Array[Byte] = _.value()
2033
}

embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package net.manub.embeddedkafka
22

3-
import org.apache.kafka.clients.consumer.KafkaConsumer
3+
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
44
import org.apache.kafka.common.KafkaException
55
import org.apache.log4j.Logger
66

@@ -9,59 +9,52 @@ import scala.util.Try
99
/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing. */
1010
object ConsumerExtensions {
1111

12-
implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {
12+
case class ConsumerRetryConfig(maximumAttempts: Int = 3, poll: Long = 2000)
1313

14-
private val logger = Logger.getLogger(classOf[ConsumerOps[K, V]])
14+
implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {
1515

16-
/** Consume messages from a given topic and return them as a lazily evaluated Scala Stream.
17-
* Depending on how many messages are taken from the Scala Stream it will try up to 3 times
18-
* to consume batches from the given topic, until it reaches the number of desired messages or
19-
* return otherwise.
20-
*
21-
* @param topic the topic from which to consume messages
22-
* @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
23-
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
24-
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
25-
* to evaluate the requested number of messages.
26-
*/
27-
def consumeLazily(topic: String, maximumAttempts: Int = 3, poll: Long = 2000): Stream[(K, V)] = {
28-
consumeLazilyOnTopics(List(topic), maximumAttempts, poll).map { case (t, k, v) => (k, v) }
29-
}
16+
private val logger = Logger.getLogger(getClass)
3017

31-
/** Consume messages from a given list of topics and return them as a lazily evaluated Scala Stream.
32-
* Depending on how many messages are taken from the Scala Stream it will try up to 3 times
18+
/** Consume messages from one or many topics and return them as a lazily evaluated Scala Stream.
19+
* Depending on how many messages are taken from the Scala Stream it will try up to retryConf.maximumAttempts times
3320
* to consume batches from the given topic, until it reaches the number of desired messages or
3421
* return otherwise.
3522
*
3623
* @param topics the topics from which to consume messages
37-
* @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
38-
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
24+
* @param decoder the function to use for decoding all [[ConsumerRecord]]
25+
* @param retryConf contains the maximum number of attempts to try and get the next batch and the amount
26+
* of time, in milliseconds, to wait in the buffer for any messages to be available
3927
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
4028
* to evaluate the requested number of messages.
4129
*/
42-
def consumeLazilyOnTopics(topics: List[String], maximumAttempts: Int = 3, poll: Long = 2000): Stream[(String, K, V)] = {
43-
val attempts = 1 to maximumAttempts
30+
def consumeLazily[T](topics: String*)(
31+
implicit decoder: ConsumerRecord[K, V] => T,
32+
retryConf: ConsumerRetryConfig = ConsumerRetryConfig()
33+
): Stream[T] = {
34+
val attempts = 1 to retryConf.maximumAttempts
4435
attempts.toStream.flatMap { attempt =>
45-
val batch: Seq[(String, K, V)] = getNextBatch(topics, poll)
36+
val batch: Seq[T] = getNextBatch(retryConf.poll, topics)
4637
logger.debug(s"----> Batch $attempt ($topics) | ${batch.mkString("|")}")
4738
batch
4839
}
4940
}
5041

5142
/** Get the next batch of messages from Kafka.
5243
*
53-
* @param topics the topic to consume
54-
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
44+
* @param topics the topic to consume
45+
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
46+
* @param decoder the function to use for decoding all [[ConsumerRecord]]
5547
* @return the next batch of messages
5648
*/
57-
private def getNextBatch(topics: List[String], poll: Long): Seq[(String, K, V)] =
49+
private def getNextBatch[T](poll: Long, topics: Seq[String])(
50+
implicit decoder: ConsumerRecord[K, V] => T): Seq[T] =
5851
Try {
5952
import scala.collection.JavaConverters._
6053
consumer.subscribe(topics.asJava)
6154
topics.foreach(consumer.partitionsFor)
6255
val records = consumer.poll(poll)
6356
// use toList to force eager evaluation. toSeq is lazy
64-
records.iterator().asScala.toList.map(r => (r.topic, r.key, r.value))
57+
records.iterator().asScala.toList.map(decoder(_))
6558
}.recover {
6659
case ex: KafkaException => throw new KafkaUnavailableException(ex)
6760
}.get

embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala renamed to embedded-kafka/src/test/scala/net/manub/embeddedkafka/ConsumerExtensionsSpec.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,51 +8,57 @@ import org.scalatest.mockito.MockitoSugar
88

99
import scala.collection.JavaConverters._
1010

11-
class ConsumerOpsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar {
11+
class ConsumerExtensionsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar {
12+
13+
import net.manub.embeddedkafka.Codecs.stringValueCrDecoder
14+
15+
"consumeLazily" should {
1216

13-
"ConsumeLazily " should {
1417
"retry to get messages with the configured maximum number of attempts when poll fails" in {
18+
19+
implicit val retryConf = ConsumerRetryConfig(2, 1)
20+
1521
val consumer = mock[KafkaConsumer[String, String]]
1622
val consumerRecords =
1723
new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava)
1824

19-
val pollTimeout = 1
20-
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
25+
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
2126

22-
val maximumAttempts = 2
23-
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
27+
consumer.consumeLazily[String]("topic")
2428

25-
verify(consumer, times(maximumAttempts)).poll(pollTimeout)
29+
verify(consumer, times(retryConf.maximumAttempts)).poll(retryConf.poll)
2630
}
2731

2832
"not retry to get messages with the configured maximum number of attempts when poll succeeds" in {
33+
34+
implicit val retryConf = ConsumerRetryConfig(2, 1)
35+
2936
val consumer = mock[KafkaConsumer[String, String]]
3037
val consumerRecord = mock[ConsumerRecord[String, String]]
3138
val consumerRecords = new ConsumerRecords[String, String](
3239
Map[TopicPartition, java.util.List[ConsumerRecord[String, String]]](new TopicPartition("topic", 1) -> List(consumerRecord).asJava).asJava
3340
)
3441

35-
val pollTimeout = 1
36-
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
42+
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
3743

38-
val maximumAttempts = 2
39-
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
44+
consumer.consumeLazily[String]("topic")
4045

41-
verify(consumer).poll(pollTimeout)
46+
verify(consumer).poll(retryConf.poll)
4247
}
4348

4449
"poll to get messages with the configured poll timeout" in {
50+
51+
implicit val retryConf = ConsumerRetryConfig(1, 10)
52+
4553
val consumer = mock[KafkaConsumer[String, String]]
4654
val consumerRecords =
4755
new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava)
4856

49-
val pollTimeout = 10
50-
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
57+
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
5158

52-
val maximumAttempts = 1
53-
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
59+
consumer.consumeLazily[String]("topic")
5460

55-
verify(consumer).poll(pollTimeout)
61+
verify(consumer).poll(retryConf.poll)
5662
}
5763
}
5864

kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class ExampleKafkaStreamsSpec
1212
with Matchers
1313
with EmbeddedKafkaStreamsAllInOne {
1414

15+
import net.manub.embeddedkafka.Codecs.stringKeyValueCrDecoder
16+
1517
implicit val config =
1618
EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)
1719

@@ -52,7 +54,7 @@ class ExampleKafkaStreamsSpec
5254
publishToKafka(inTopic, "hello", "world")
5355
publishToKafka(inTopic, "foo", "bar")
5456
val consumer = newConsumer[String, String]()
55-
consumer.consumeLazily(outTopic).take(2) should be(
57+
consumer.consumeLazily[(String, String)](outTopic).take(2) should be(
5658
Seq("hello" -> "world", "foo" -> "bar"))
5759
consumer.close()
5860
}
@@ -68,7 +70,8 @@ class ExampleKafkaStreamsSpec
6870
runStreamsWithStringConsumer(Seq(inTopic, outTopic), streamBuilder) {
6971
consumer =>
7072
publishToKafka(inTopic, "hello", "world")
71-
consumer.consumeLazily(outTopic).head should be("hello" -> "world")
73+
consumer.consumeLazily[(String, String)](outTopic).head should be(
74+
"hello" -> "world")
7275
}
7376
}
7477
}

0 commit comments

Comments
 (0)