8
8
9
9
package infrastructure .events ;
10
10
11
+ import application .controller .manager .EventManager ;
12
+ import application .presenter .event .model .Event ;
13
+ import application .presenter .event .serialization .EventDeserializer ;
14
+ import application .presenter .event .serialization .EventDeserializerImpl ;
11
15
import org .apache .kafka .clients .consumer .KafkaConsumer ;
12
16
13
17
import java .time .Duration ;
14
18
import java .util .List ;
15
19
import java .util .Map ;
16
20
import java .util .Objects ;
21
+ import java .util .function .Consumer ;
17
22
import java .util .logging .Logger ;
18
23
19
24
/**
20
25
* This class manage the Kafka client needed to consume events.
21
26
*/
22
- public class KafkaClient {
27
+ public class KafkaClient implements EventManager {
23
28
private static final String BOOSTRAP_SERVER_URL_VARIABLE = "BOOTSTRAP_SERVER_URL" ;
24
29
private static final String SCHEMA_REGISTRY_URL_VARIABLE = "SCHEMA_REGISTRY_URL" ;
25
30
private static final String ROOM_EVENT_TOPIC = "room-events" ;
@@ -29,6 +34,7 @@ public class KafkaClient {
29
34
private static KafkaClient instance ;
30
35
31
36
private final KafkaConsumer <String , String > kafkaConsumer ;
37
+ private final EventDeserializer eventDeserializer ;
32
38
33
39
/**
34
40
* Obtain the current instance of the Kafka Client.
@@ -54,15 +60,19 @@ protected KafkaClient() {
54
60
)
55
61
);
56
62
this .kafkaConsumer .subscribe (List .of (ROOM_EVENT_TOPIC , MEDICAL_TECHNOLOGY_EVENT_TOPIC ));
63
+ this .eventDeserializer = new EventDeserializerImpl ();
57
64
}
58
65
59
66
/**
60
67
* Polling cycle to obtain all the events.
61
68
*/
62
- public void poll () {
69
+ @ Override
70
+ public void poll (final Consumer <Event <?>> eventConsumer ) {
63
71
while (true ) {
64
72
this .kafkaConsumer .poll (Duration .ofMillis (POLLING_TIME )).forEach (event -> {
73
+ // log the event
65
74
Logger .getLogger (KafkaClient .class .getName ()).fine (event .toString ());
75
+ eventConsumer .accept (this .eventDeserializer .fromString (event .key (), event .value ()));
66
76
});
67
77
}
68
78
}
0 commit comments