Skip to content

Commit c057671

Browse files
lacarvalho91manub
authored andcommitted
increase flexibility of ConsumerOps (#95)
* increase flexibility of ConsumerOps * Add decoders for extracting the topic from a cr and update README.
1 parent 855a54a commit c057671

File tree

5 files changed

+73
-50
lines changed

5 files changed

+73
-50
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,16 @@ Use the `Consumer` trait that easily creates consumers of arbitrary key-value ty
161161

162162
### Easy message consumption
163163

164-
With `ConsumerExtensions` you can turn a consumer to a Scala lazy Stream of key-value pairs and treat it as a collection for easy assertion.
164+
With `ConsumerExtensions` you can turn a consumer to a Scala lazy Stream of `T` and treat it as a collection for easy assertion.
165165
* Just import the extensions.
166+
* Bring an implicit `ConsumerRecord[_, _] => T` transform function into scope (some common functions are provided in `Codecs`).
166167
* On any `KafkaConsumer` instance you can now do:
167168

168169
```scala
169170
import net.manub.embeddedkafka.ConsumerExtensions._
171+
import net.manub.embeddedkafka.Codecs.stringKeyValueCrDecoder
170172
...
171-
consumer.consumeLazily("from-this-topic").take(3).toList should be (Seq(
173+
consumer.consumeLazily[(String, String)]("from-this-topic").take(3).toList should be (Seq(
172174
"1" -> "one",
173175
"2" -> "two",
174176
"3" -> "three"

embedded-kafka/src/main/scala/net/manub/embeddedkafka/Codecs.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package net.manub.embeddedkafka
22

33
import kafka.serializer._
4+
import org.apache.kafka.clients.consumer.ConsumerRecord
45
import org.apache.kafka.common.serialization._
56

6-
/** useful encoders/serializers and decoders/deserializers **/
7+
/** useful encoders/serializers, decoders/deserializers and [[ConsumerRecord]] decoders**/
78
object Codecs {
89
implicit val stringEncoder: Encoder[String] = new StringEncoder()
910
implicit val nullEncoder: Encoder[Array[Byte]] = new DefaultEncoder()
@@ -17,4 +18,22 @@ object Codecs {
1718
new StringDeserializer()
1819
implicit val nullDeserializer: Deserializer[Array[Byte]] =
1920
new ByteArrayDeserializer()
21+
22+
implicit val stringKeyValueCrDecoder
23+
: ConsumerRecord[String, String] => (String, String) =
24+
cr => (cr.key(), cr.value)
25+
implicit val stringValueCrDecoder: ConsumerRecord[String, String] => String =
26+
_.value()
27+
implicit val stringKeyValueTopicCrDecoder
28+
: ConsumerRecord[String, String] => (String, String, String) = cr =>
29+
(cr.topic(), cr.key(), cr.value())
30+
31+
implicit val keyNullValueCrDecoder
32+
: ConsumerRecord[String, Array[Byte]] => (String, Array[Byte]) =
33+
cr => (cr.key(), cr.value)
34+
implicit val nullValueCrDecoder
35+
: ConsumerRecord[String, Array[Byte]] => Array[Byte] = _.value()
36+
implicit val keyNullValueTopicCrDecoder
37+
: ConsumerRecord[String, Array[Byte]] => (String, String, Array[Byte]) =
38+
cr => (cr.topic(), cr.key(), cr.value())
2039
}

embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package net.manub.embeddedkafka
22

3-
import org.apache.kafka.clients.consumer.KafkaConsumer
3+
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
44
import org.apache.kafka.common.KafkaException
55
import org.apache.log4j.Logger
66

@@ -9,59 +9,52 @@ import scala.util.Try
99
/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing. */
1010
object ConsumerExtensions {
1111

12-
implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {
12+
case class ConsumerRetryConfig(maximumAttempts: Int = 3, poll: Long = 2000)
1313

14-
private val logger = Logger.getLogger(classOf[ConsumerOps[K, V]])
14+
implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {
1515

16-
/** Consume messages from a given topic and return them as a lazily evaluated Scala Stream.
17-
* Depending on how many messages are taken from the Scala Stream it will try up to 3 times
18-
* to consume batches from the given topic, until it reaches the number of desired messages or
19-
* return otherwise.
20-
*
21-
* @param topic the topic from which to consume messages
22-
* @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
23-
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
24-
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
25-
* to evaluate the requested number of messages.
26-
*/
27-
def consumeLazily(topic: String, maximumAttempts: Int = 3, poll: Long = 2000): Stream[(K, V)] = {
28-
consumeLazilyOnTopics(List(topic), maximumAttempts, poll).map { case (t, k, v) => (k, v) }
29-
}
16+
private val logger = Logger.getLogger(getClass)
3017

31-
/** Consume messages from a given list of topics and return them as a lazily evaluated Scala Stream.
32-
* Depending on how many messages are taken from the Scala Stream it will try up to 3 times
18+
/** Consume messages from one or many topics and return them as a lazily evaluated Scala Stream.
19+
* Depending on how many messages are taken from the Scala Stream it will try up to retryConf.maximumAttempts times
3320
* to consume batches from the given topic, until it reaches the number of desired messages or
3421
* return otherwise.
3522
*
3623
* @param topics the topics from which to consume messages
37-
* @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
38-
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
24+
* @param decoder the function to use for decoding all [[ConsumerRecord]]
25+
* @param retryConf contains the maximum number of attempts to try and get the next batch and the amount
26+
* of time, in milliseconds, to wait in the buffer for any messages to be available
3927
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
4028
* to evaluate the requested number of messages.
4129
*/
42-
def consumeLazilyOnTopics(topics: List[String], maximumAttempts: Int = 3, poll: Long = 2000): Stream[(String, K, V)] = {
43-
val attempts = 1 to maximumAttempts
30+
def consumeLazily[T](topics: String*)(
31+
implicit decoder: ConsumerRecord[K, V] => T,
32+
retryConf: ConsumerRetryConfig = ConsumerRetryConfig()
33+
): Stream[T] = {
34+
val attempts = 1 to retryConf.maximumAttempts
4435
attempts.toStream.flatMap { attempt =>
45-
val batch: Seq[(String, K, V)] = getNextBatch(topics, poll)
36+
val batch: Seq[T] = getNextBatch(retryConf.poll, topics)
4637
logger.debug(s"----> Batch $attempt ($topics) | ${batch.mkString("|")}")
4738
batch
4839
}
4940
}
5041

5142
/** Get the next batch of messages from Kafka.
5243
*
53-
* @param topics the topic to consume
54-
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
44+
* @param topics the topic to consume
45+
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
46+
* @param decoder the function to use for decoding all [[ConsumerRecord]]
5547
* @return the next batch of messages
5648
*/
57-
private def getNextBatch(topics: List[String], poll: Long): Seq[(String, K, V)] =
49+
private def getNextBatch[T](poll: Long, topics: Seq[String])(
50+
implicit decoder: ConsumerRecord[K, V] => T): Seq[T] =
5851
Try {
5952
import scala.collection.JavaConverters._
6053
consumer.subscribe(topics.asJava)
6154
topics.foreach(consumer.partitionsFor)
6255
val records = consumer.poll(poll)
6356
// use toList to force eager evaluation. toSeq is lazy
64-
records.iterator().asScala.toList.map(r => (r.topic, r.key, r.value))
57+
records.iterator().asScala.toList.map(decoder(_))
6558
}.recover {
6659
case ex: KafkaException => throw new KafkaUnavailableException(ex)
6760
}.get

embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala renamed to embedded-kafka/src/test/scala/net/manub/embeddedkafka/ConsumerExtensionsSpec.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,51 +8,57 @@ import org.scalatest.mockito.MockitoSugar
88

99
import scala.collection.JavaConverters._
1010

11-
class ConsumerOpsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar {
11+
class ConsumerExtensionsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar {
12+
13+
import net.manub.embeddedkafka.Codecs.stringValueCrDecoder
14+
15+
"consumeLazily" should {
1216

13-
"ConsumeLazily " should {
1417
"retry to get messages with the configured maximum number of attempts when poll fails" in {
18+
19+
implicit val retryConf = ConsumerRetryConfig(2, 1)
20+
1521
val consumer = mock[KafkaConsumer[String, String]]
1622
val consumerRecords =
1723
new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava)
1824

19-
val pollTimeout = 1
20-
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
25+
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
2126

22-
val maximumAttempts = 2
23-
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
27+
consumer.consumeLazily[String]("topic")
2428

25-
verify(consumer, times(maximumAttempts)).poll(pollTimeout)
29+
verify(consumer, times(retryConf.maximumAttempts)).poll(retryConf.poll)
2630
}
2731

2832
"not retry to get messages with the configured maximum number of attempts when poll succeeds" in {
33+
34+
implicit val retryConf = ConsumerRetryConfig(2, 1)
35+
2936
val consumer = mock[KafkaConsumer[String, String]]
3037
val consumerRecord = mock[ConsumerRecord[String, String]]
3138
val consumerRecords = new ConsumerRecords[String, String](
3239
Map[TopicPartition, java.util.List[ConsumerRecord[String, String]]](new TopicPartition("topic", 1) -> List(consumerRecord).asJava).asJava
3340
)
3441

35-
val pollTimeout = 1
36-
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
42+
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
3743

38-
val maximumAttempts = 2
39-
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
44+
consumer.consumeLazily[String]("topic")
4045

41-
verify(consumer).poll(pollTimeout)
46+
verify(consumer).poll(retryConf.poll)
4247
}
4348

4449
"poll to get messages with the configured poll timeout" in {
50+
51+
implicit val retryConf = ConsumerRetryConfig(1, 10)
52+
4553
val consumer = mock[KafkaConsumer[String, String]]
4654
val consumerRecords =
4755
new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava)
4856

49-
val pollTimeout = 10
50-
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
57+
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)
5158

52-
val maximumAttempts = 1
53-
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
59+
consumer.consumeLazily[String]("topic")
5460

55-
verify(consumer).poll(pollTimeout)
61+
verify(consumer).poll(retryConf.poll)
5662
}
5763
}
5864

kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ class ExampleKafkaStreamsSpec
1313
with Matchers
1414
with EmbeddedKafkaStreamsAllInOne {
1515

16+
import net.manub.embeddedkafka.Codecs.stringKeyValueCrDecoder
17+
1618
implicit val config =
1719
EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)
1820

@@ -53,7 +55,7 @@ class ExampleKafkaStreamsSpec
5355
publishToKafka(inTopic, "hello", "world")
5456
publishToKafka(inTopic, "foo", "bar")
5557
val consumer = newConsumer[String, String]()
56-
consumer.consumeLazily(outTopic).take(2) should be(
58+
consumer.consumeLazily[(String, String)](outTopic).take(2) should be(
5759
Seq("hello" -> "world", "foo" -> "bar"))
5860
consumer.close()
5961
}
@@ -69,7 +71,8 @@ class ExampleKafkaStreamsSpec
6971
runStreamsWithStringConsumer(Seq(inTopic, outTopic), streamBuilder.build()) {
7072
consumer =>
7173
publishToKafka(inTopic, "hello", "world")
72-
consumer.consumeLazily(outTopic).head should be("hello" -> "world")
74+
consumer.consumeLazily[(String, String)](outTopic).head should be(
75+
"hello" -> "world")
7376
}
7477
}
7578
}

0 commit comments

Comments
 (0)