File tree Expand file tree Collapse file tree 1 file changed +41
-0
lines changed
src/main/kotlin/infrastructure/signalr Expand file tree Collapse file tree 1 file changed +41
-0
lines changed Original file line number Diff line number Diff line change
1
+ /*
2
+ * Copyright (c) 2023. Smart Operating Block
3
+ *
4
+ * Use of this source code is governed by an MIT-style
5
+ * license that can be found in the LICENSE file or at
6
+ * https://opensource.org/licenses/MIT.
7
+ */
8
+
9
+ package infrastructure.signalr
10
+
11
+ import application.presenter.EventConsumer
12
+ import application.presenter.EventParser
13
+ import com.microsoft.signalr.HubConnectionBuilder
14
+ import entities.events.Event
15
+ import infrastructure.digitaltwins.DTEventParser
16
+ import io.reactivex.rxjava3.core.FlowableEmitter
17
+
18
+ /* *
19
+ * The consumer of events from a SignalR topic.
20
+ */
21
+ class SignalRClient : EventConsumer <String > {
22
+
23
+ init {
24
+ requireNotNull(System .getenv(" SIGNALR_CONNECTION_STRING" )) {
25
+ println (" Invalid connection String: please provide a valid connection String!" )
26
+ }
27
+ }
28
+
29
+ private val connection = HubConnectionBuilder .create(System .getenv(" SIGNALR_CONNECTION_STRING" )).build()
30
+ private val eventParser: EventParser <String > = DTEventParser ()
31
+
32
+ override fun start (emitter : FlowableEmitter <Event <Any >>) {
33
+ connection.on(" newMessage" , { event ->
34
+ emitter.onNext(consumeEvent(event))
35
+ }, String ::class .java)
36
+ connection.start()
37
+ }
38
+
39
+ override fun consumeEvent (inputEvent : String ): Event <Any > = eventParser.parseEvent(inputEvent)
40
+
41
+ }
You can’t perform that action at this time.
0 commit comments