Skip to content

increase flexibility of ConsumerOps #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,16 @@ Use the `Consumer` trait that easily creates consumers of arbitrary key-value ty

### Easy message consumption

With `ConsumerExtensions` you can turn a consumer to a Scala lazy Stream of key-value pairs and treat it as a collection for easy assertion.
With `ConsumerExtensions` you can turn a consumer to a Scala lazy Stream of `T` and treat it as a collection for easy assertion.
* Just import the extensions.
* Bring an implicit `ConsumerRecord[_, _] => T` transform function into scope (some common functions are provided in `Codecs`).
* On any `KafkaConsumer` instance you can now do:

```scala
import net.manub.embeddedkafka.ConsumerExtensions._
import net.manub.embeddedkafka.Codecs.stringKeyValueCrDecoder
...
consumer.consumeLazily("from-this-topic").take(3).toList should be (Seq(
consumer.consumeLazily[(String, String)]("from-this-topic").take(3).toList should be (Seq(
"1" -> "one",
"2" -> "two",
"3" -> "three"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package net.manub.embeddedkafka

import kafka.serializer._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._

/** useful encoders/serializers and decoders/deserializers **/
/** useful encoders/serializers, decoders/deserializers and [[ConsumerRecord]] decoders**/
object Codecs {
implicit val stringEncoder: Encoder[String] = new StringEncoder()
implicit val nullEncoder: Encoder[Array[Byte]] = new DefaultEncoder()
Expand All @@ -17,4 +18,22 @@ object Codecs {
new StringDeserializer()
implicit val nullDeserializer: Deserializer[Array[Byte]] =
new ByteArrayDeserializer()

implicit val stringKeyValueCrDecoder
: ConsumerRecord[String, String] => (String, String) =
cr => (cr.key(), cr.value)
implicit val stringValueCrDecoder: ConsumerRecord[String, String] => String =
_.value()
implicit val stringKeyValueTopicCrDecoder
: ConsumerRecord[String, String] => (String, String, String) = cr =>
(cr.topic(), cr.key(), cr.value())

implicit val keyNullValueCrDecoder
: ConsumerRecord[String, Array[Byte]] => (String, Array[Byte]) =
cr => (cr.key(), cr.value)
implicit val nullValueCrDecoder
: ConsumerRecord[String, Array[Byte]] => Array[Byte] = _.value()
implicit val keyNullValueTopicCrDecoder
: ConsumerRecord[String, Array[Byte]] => (String, String, Array[Byte]) =
cr => (cr.topic(), cr.key(), cr.value())
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package net.manub.embeddedkafka

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.KafkaException
import org.apache.log4j.Logger

Expand All @@ -9,59 +9,52 @@ import scala.util.Try
/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing. */
object ConsumerExtensions {

implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {
case class ConsumerRetryConfig(maximumAttempts: Int = 3, poll: Long = 2000)

private val logger = Logger.getLogger(classOf[ConsumerOps[K, V]])
implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {

/** Consume messages from a given topic and return them as a lazily evaluated Scala Stream.
* Depending on how many messages are taken from the Scala Stream it will try up to 3 times
* to consume batches from the given topic, until it reaches the number of desired messages or
* return otherwise.
*
* @param topic the topic from which to consume messages
* @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
* to evaluate the requested number of messages.
*/
def consumeLazily(topic: String, maximumAttempts: Int = 3, poll: Long = 2000): Stream[(K, V)] = {
consumeLazilyOnTopics(List(topic), maximumAttempts, poll).map { case (t, k, v) => (k, v) }
}
private val logger = Logger.getLogger(getClass)

/** Consume messages from a given list of topics and return them as a lazily evaluated Scala Stream.
* Depending on how many messages are taken from the Scala Stream it will try up to 3 times
/** Consume messages from one or many topics and return them as a lazily evaluated Scala Stream.
* Depending on how many messages are taken from the Scala Stream it will try up to retryConf.maximumAttempts times
* to consume batches from the given topic, until it reaches the number of desired messages or
* return otherwise.
*
* @param topics the topics from which to consume messages
* @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
* @param decoder the function to use for decoding all [[ConsumerRecord]]
* @param retryConf contains the maximum number of attempts to try and get the next batch and the amount
* of time, in milliseconds, to wait in the buffer for any messages to be available
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
* to evaluate the requested number of messages.
*/
def consumeLazilyOnTopics(topics: List[String], maximumAttempts: Int = 3, poll: Long = 2000): Stream[(String, K, V)] = {
val attempts = 1 to maximumAttempts
def consumeLazily[T](topics: String*)(
implicit decoder: ConsumerRecord[K, V] => T,
retryConf: ConsumerRetryConfig = ConsumerRetryConfig()
): Stream[T] = {
val attempts = 1 to retryConf.maximumAttempts
attempts.toStream.flatMap { attempt =>
val batch: Seq[(String, K, V)] = getNextBatch(topics, poll)
val batch: Seq[T] = getNextBatch(retryConf.poll, topics)
logger.debug(s"----> Batch $attempt ($topics) | ${batch.mkString("|")}")
batch
}
}

/** Get the next batch of messages from Kafka.
*
* @param topics the topic to consume
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
* @param topics the topic to consume
* @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
* @param decoder the function to use for decoding all [[ConsumerRecord]]
* @return the next batch of messages
*/
private def getNextBatch(topics: List[String], poll: Long): Seq[(String, K, V)] =
private def getNextBatch[T](poll: Long, topics: Seq[String])(
implicit decoder: ConsumerRecord[K, V] => T): Seq[T] =
Try {
import scala.collection.JavaConverters._
consumer.subscribe(topics.asJava)
topics.foreach(consumer.partitionsFor)
val records = consumer.poll(poll)
// use toList to force eager evaluation. toSeq is lazy
records.iterator().asScala.toList.map(r => (r.topic, r.key, r.value))
records.iterator().asScala.toList.map(decoder(_))
}.recover {
case ex: KafkaException => throw new KafkaUnavailableException(ex)
}.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,57 @@ import org.scalatest.mockito.MockitoSugar

import scala.collection.JavaConverters._

class ConsumerOpsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar {
class ConsumerExtensionsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar {

import net.manub.embeddedkafka.Codecs.stringValueCrDecoder

"consumeLazily" should {

"ConsumeLazily " should {
"retry to get messages with the configured maximum number of attempts when poll fails" in {

implicit val retryConf = ConsumerRetryConfig(2, 1)

val consumer = mock[KafkaConsumer[String, String]]
val consumerRecords =
new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava)

val pollTimeout = 1
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)

val maximumAttempts = 2
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
consumer.consumeLazily[String]("topic")

verify(consumer, times(maximumAttempts)).poll(pollTimeout)
verify(consumer, times(retryConf.maximumAttempts)).poll(retryConf.poll)
}

"not retry to get messages with the configured maximum number of attempts when poll succeeds" in {

implicit val retryConf = ConsumerRetryConfig(2, 1)

val consumer = mock[KafkaConsumer[String, String]]
val consumerRecord = mock[ConsumerRecord[String, String]]
val consumerRecords = new ConsumerRecords[String, String](
Map[TopicPartition, java.util.List[ConsumerRecord[String, String]]](new TopicPartition("topic", 1) -> List(consumerRecord).asJava).asJava
)

val pollTimeout = 1
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)

val maximumAttempts = 2
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
consumer.consumeLazily[String]("topic")

verify(consumer).poll(pollTimeout)
verify(consumer).poll(retryConf.poll)
}

"poll to get messages with the configured poll timeout" in {

implicit val retryConf = ConsumerRetryConfig(1, 10)

val consumer = mock[KafkaConsumer[String, String]]
val consumerRecords =
new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava)

val pollTimeout = 10
when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)
when(consumer.poll(retryConf.poll)).thenReturn(consumerRecords)

val maximumAttempts = 1
consumer.consumeLazily("topic", maximumAttempts, pollTimeout)
consumer.consumeLazily[String]("topic")

verify(consumer).poll(pollTimeout)
verify(consumer).poll(retryConf.poll)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class ExampleKafkaStreamsSpec
with Matchers
with EmbeddedKafkaStreamsAllInOne {

import net.manub.embeddedkafka.Codecs.stringKeyValueCrDecoder

implicit val config =
EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)

Expand Down Expand Up @@ -52,7 +54,7 @@ class ExampleKafkaStreamsSpec
publishToKafka(inTopic, "hello", "world")
publishToKafka(inTopic, "foo", "bar")
val consumer = newConsumer[String, String]()
consumer.consumeLazily(outTopic).take(2) should be(
consumer.consumeLazily[(String, String)](outTopic).take(2) should be(
Seq("hello" -> "world", "foo" -> "bar"))
consumer.close()
}
Expand All @@ -68,7 +70,8 @@ class ExampleKafkaStreamsSpec
runStreamsWithStringConsumer(Seq(inTopic, outTopic), streamBuilder) {
consumer =>
publishToKafka(inTopic, "hello", "world")
consumer.consumeLazily(outTopic).head should be("hello" -> "world")
consumer.consumeLazily[(String, String)](outTopic).head should be(
"hello" -> "world")
}
}
}
Expand Down