Skip to content

Commit bcec1a2

Browse files
feat: create kafka producer
1 parent ca50c2f commit bcec1a2

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2023. Smart Operating Block
3+
*
4+
* Use of this source code is governed by an MIT-style
5+
* license that can be found in the LICENSE file or at
6+
* https://opensource.org/licenses/MIT.
7+
*/
8+
9+
package infrastructure.kafka
10+
11+
import application.presenter.EventPublisher
12+
import entities.events.Event
13+
import io.confluent.kafka.serializers.KafkaJsonSerializer
14+
import org.apache.kafka.clients.producer.KafkaProducer
15+
import org.apache.kafka.clients.producer.ProducerRecord
16+
17+
/**
18+
* The publisher of events to a Kafka Event Broker.
19+
*/
20+
class KafkaPublisher : EventPublisher<Event<Any>> {
21+
22+
init {
23+
listOf(System.getenv("BOOTSTRAP_SERVER_URL"), System.getenv("SCHEMA_REGISTRY_URL")).forEach {
24+
requireNotNull(it) {
25+
println("""
26+
Invalid environment variable!
27+
Check the documentation here: https://github.com/SmartOperatingBlock/bootstrap")
28+
""".trimIndent())
29+
}
30+
}
31+
}
32+
33+
private val producerProps = mapOf(
34+
"bootstrap.servers" to System.getenv("BOOTSTRAP_SERVER_URL"),
35+
"schema.registry.url" to System.getenv("SCHEMA_REGISTRY_URL"),
36+
"key.serializer" to "org.apache.kafka.common.serialization.StringSerializer",
37+
"value.serializer" to KafkaJsonSerializer::class.java
38+
)
39+
40+
private val eventToTopic: Map<String, String> = mapOf(
41+
"ROOM_EVENT" to "room-events",
42+
"PROCESS_EVENT" to "process-events",
43+
"TRACKING_EVENT" to "tracking-events"
44+
)
45+
46+
private val producer: KafkaProducer<String, Event<Any>> = KafkaProducer(producerProps)
47+
48+
override fun publishEvent(event: Event<Any>) {
49+
val record = ProducerRecord(eventToTopic[event.key], event.key, event)
50+
producer.send(record)
51+
}
52+
}

0 commit comments

Comments
 (0)