Skip to content

Commit 3c2fdbc

Browse files
timgentonzomanub
authored andcommitted
Method to publish a ProducerRecord (#107)
- Addresses issue #93 - #93
1 parent 23370a2 commit 3c2fdbc

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,24 @@ sealed trait EmbeddedKafkaSupport {
218218
serializer),
219219
new ProducerRecord[String, T](topic, message))
220220

221+
/**
222+
* Publishes synchronously a message to the running Kafka broker.
223+
*
224+
* @param topic the topic to which publish the message (it will be auto-created)
225+
* @param producerRecord the producerRecord of type [[T]] to publish
226+
* @param config an implicit [[EmbeddedKafkaConfig]]
227+
* @param serializer an implicit [[Serializer]] for the type [[T]]
228+
* @throws KafkaUnavailableException if unable to connect to Kafka
229+
*/
230+
@throws(classOf[KafkaUnavailableException])
231+
def publishToKafka[T](topic: String, producerRecord: ProducerRecord[String, T])(
232+
implicit config: EmbeddedKafkaConfig,
233+
serializer: Serializer[T]): Unit =
234+
publishToKafka(new KafkaProducer(baseProducerConfig.asJava,
235+
new StringSerializer(),
236+
serializer),
237+
producerRecord)
238+
221239
/**
222240
* Publishes synchronously a message to the running Kafka broker.
223241
*

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.apache.kafka.clients.producer.{
99
ProducerConfig,
1010
ProducerRecord
1111
}
12+
import org.apache.kafka.common.header.internals.RecordHeaders
1213
import org.apache.kafka.common.serialization.{
1314
ByteArraySerializer,
1415
StringDeserializer,
@@ -58,6 +59,33 @@ class EmbeddedKafkaMethodsSpec
5859
consumer.close()
5960
}
6061

62+
"publish synchronously a String message with a header to Kafka" in {
63+
implicit val serializer = new StringSerializer()
64+
implicit val deserializer = new StringDeserializer()
65+
val message = "hello world!"
66+
val topic = "publish_test_topic_with_header"
67+
val headerValue = "my_header_value"
68+
val headers = new RecordHeaders().add("my_header", headerValue.toCharArray.map(_.toByte))
69+
val producerRecord = new ProducerRecord[String, String](topic, null, "key", message, headers)
70+
71+
publishToKafka(topic, producerRecord)
72+
73+
val consumer = kafkaConsumer
74+
consumer.subscribe(List(topic).asJava)
75+
76+
val records = consumer.poll(consumerPollTimeout)
77+
78+
records.iterator().hasNext shouldBe true
79+
val record = records.iterator().next()
80+
81+
record.value() shouldBe message
82+
val myHeader = record.headers().lastHeader("my_header")
83+
val actualHeaderValue = new String(myHeader.value().map(_.toChar))
84+
actualHeaderValue shouldBe headerValue
85+
86+
consumer.close()
87+
}
88+
6189
"publish synchronously a String message with String key to Kafka" in {
6290
implicit val serializer = new StringSerializer()
6391
implicit val deserializer = new StringDeserializer()

0 commit comments

Comments
 (0)