Skip to content

Commit 3e25e52

Browse files
feat: add kafka client
1 parent 7985e36 commit 3e25e52

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright (c) 2023. Smart Operating Block
3+
*
4+
* Use of this source code is governed by an MIT-style
5+
* license that can be found in the LICENSE file or at
6+
* https://opensource.org/licenses/MIT.
7+
*/
8+
9+
package infrastructure.events;
10+
11+
import org.apache.kafka.clients.consumer.KafkaConsumer;
12+
13+
import java.time.Duration;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.Objects;
17+
import java.util.logging.Logger;
18+
19+
/**
20+
* This class manage the Kafka client needed to consume events.
21+
*/
22+
public class KafkaClient {
23+
private static final String BOOSTRAP_SERVER_URL_VARIABLE = "BOOTSTRAP_SERVER_URL";
24+
private static final String SCHEMA_REGISTRY_URL_VARIABLE = "SCHEMA_REGISTRY_URL";
25+
private static final String ROOM_EVENT_TOPIC = "room-events";
26+
private static final String MEDICAL_TECHNOLOGY_EVENT_TOPIC = "process-events";
27+
private static final long POLLING_TIME = 100L;
28+
29+
private static KafkaClient instance;
30+
31+
private final KafkaConsumer<String, String> kafkaConsumer;
32+
33+
/**
34+
* Obtain the current instance of the Kafka Client.
35+
* @return the kafka client.
36+
*/
37+
public static synchronized KafkaClient getInstance() {
38+
if (instance == null) {
39+
instance = new KafkaClient();
40+
}
41+
return instance;
42+
}
43+
44+
/**
45+
* Default constructor.
46+
*/
47+
protected KafkaClient() {
48+
Objects.requireNonNull(System.getenv(BOOSTRAP_SERVER_URL_VARIABLE), "Kafka bootstrap server url required");
49+
Objects.requireNonNull(System.getenv(SCHEMA_REGISTRY_URL_VARIABLE), "Kafka schema registry url required");
50+
this.kafkaConsumer = new KafkaConsumer<>(
51+
loadConfiguration(
52+
System.getenv(BOOSTRAP_SERVER_URL_VARIABLE),
53+
System.getenv(SCHEMA_REGISTRY_URL_VARIABLE)
54+
)
55+
);
56+
this.kafkaConsumer.subscribe(List.of(ROOM_EVENT_TOPIC, MEDICAL_TECHNOLOGY_EVENT_TOPIC));
57+
}
58+
59+
/**
60+
* Polling cycle to obtain all the events.
61+
*/
62+
public void poll() {
63+
while (true) {
64+
this.kafkaConsumer.poll(Duration.ofMillis(POLLING_TIME)).forEach(event -> {
65+
Logger.getLogger(KafkaClient.class.getName()).fine(event.toString());
66+
});
67+
}
68+
}
69+
70+
private Map<String, Object> loadConfiguration(final String boostrapServerUrl, final String schemaRegistryUrl) {
71+
return Map.of(
72+
"bootstrap.servers", boostrapServerUrl,
73+
"schema.registry.url", schemaRegistryUrl,
74+
"group.id", "automation-management-consumer",
75+
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
76+
"value.deserializer", "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer"
77+
);
78+
}
79+
}

0 commit comments

Comments
 (0)