Skip to content

Commit 0d1c71d

Browse files
committed
Method to publish a ProducerRecord
- Addresses issue #93 - #93
1 parent 23370a2 commit 0d1c71d

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,24 @@ sealed trait EmbeddedKafkaSupport {
218218
serializer),
219219
new ProducerRecord[String, T](topic, message))
220220

221+
/**
222+
* Publishes synchronously a message to the running Kafka broker.
223+
*
224+
* @param topic the topic to which publish the message (it will be auto-created)
225+
* @param producerRecord the producerRecord of type [[T]] to publish
226+
* @param config an implicit [[EmbeddedKafkaConfig]]
227+
* @param serializer an implicit [[Serializer]] for the type [[T]]
228+
* @throws KafkaUnavailableException if unable to connect to Kafka
229+
*/
230+
@throws(classOf[KafkaUnavailableException])
231+
def publishToKafka[T](topic: String, producerRecord: ProducerRecord[String, T])(
232+
implicit config: EmbeddedKafkaConfig,
233+
serializer: Serializer[T]): Unit =
234+
publishToKafka(new KafkaProducer(baseProducerConfig.asJava,
235+
new StringSerializer(),
236+
serializer),
237+
producerRecord)
238+
221239
/**
222240
* Publishes synchronously a message to the running Kafka broker.
223241
*

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.apache.kafka.clients.producer.{
99
ProducerConfig,
1010
ProducerRecord
1111
}
12+
import org.apache.kafka.common.header.internals.RecordHeaders
1213
import org.apache.kafka.common.serialization.{
1314
ByteArraySerializer,
1415
StringDeserializer,
@@ -58,6 +59,32 @@ class EmbeddedKafkaMethodsSpec
5859
consumer.close()
5960
}
6061

62+
"publish synchronously a String message with a header to Kafka" in {
63+
implicit val serializer = new StringSerializer()
64+
implicit val deserializer = new StringDeserializer()
65+
val message = "hello world!"
66+
val topic = "publish_test_topic"
67+
val headers = new RecordHeaders().add("my_header", "my_header_value".toCharArray.map(_.toByte))
68+
val producerRecord = new ProducerRecord[String, String](topic, null, "key", message, headers)
69+
70+
publishToKafka(topic, producerRecord)
71+
72+
val consumer = kafkaConsumer
73+
consumer.subscribe(List(topic).asJava)
74+
75+
val records = consumer.poll(consumerPollTimeout)
76+
77+
records.iterator().hasNext shouldBe true
78+
val record = records.iterator().next()
79+
80+
record.value() shouldBe message
81+
val myHeader = record.headers().lastHeader("my_header")
82+
val headerValue = new String(myHeader.value().map(_.toChar))
83+
headerValue shouldBe "my_header_value"
84+
85+
consumer.close()
86+
}
87+
6188
"publish synchronously a String message with String key to Kafka" in {
6289
implicit val serializer = new StringSerializer()
6390
implicit val deserializer = new StringDeserializer()

0 commit comments

Comments
 (0)