Skip to content

Commit c0999ab

Browse files
committed
provides server-to-client communication sample
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 8ce390a commit c0999ab

File tree

1 file changed

+237
-0
lines changed

1 file changed

+237
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.examples.transport.tcp.duplex;
17+
18+
import io.rsocket.ConnectionSetupPayload;
19+
import io.rsocket.Payload;
20+
import io.rsocket.RSocket;
21+
import io.rsocket.SocketAcceptor;
22+
import io.rsocket.core.RSocketConnector;
23+
import io.rsocket.core.RSocketServer;
24+
import io.rsocket.transport.netty.client.TcpClientTransport;
25+
import io.rsocket.transport.netty.server.TcpServerTransport;
26+
import io.rsocket.util.DefaultPayload;
27+
import java.time.Duration;
28+
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ConcurrentMap;
31+
import java.util.concurrent.LinkedBlockingQueue;
32+
import java.util.concurrent.ThreadLocalRandom;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
import reactor.core.publisher.BaseSubscriber;
36+
import reactor.core.publisher.Flux;
37+
import reactor.core.publisher.Mono;
38+
import reactor.core.publisher.UnicastProcessor;
39+
import reactor.util.concurrent.Queues;
40+
41+
/**
42+
* An example of long-running tasks processing (a.k.a Kafka style) where a client submits tasks over
43+
* request `FireAndForget` and then receives results over the same method but on it is own side.
44+
*
45+
* <p>This example shows a case when the client may disappear, however, another a client can connect
46+
* again and receive undelivered completed tasks remaining for the previous one.
47+
*/
48+
public class TaskProcessingWithServerSideNotificationsExample {
49+
50+
public static void main(String[] args) throws InterruptedException {
51+
UnicastProcessor<Task> tasksProcessor =
52+
UnicastProcessor.create(Queues.<Task>unboundedMultiproducer().get());
53+
ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap = new ConcurrentHashMap<>();
54+
ConcurrentMap<String, RSocket> idToRSocketMap = new ConcurrentHashMap<>();
55+
BackgroundWorker backgroundWorker =
56+
new BackgroundWorker(tasksProcessor, idToCompletedTasksMap, idToRSocketMap);
57+
58+
RSocketServer.create(new TasksAcceptor(tasksProcessor, idToCompletedTasksMap, idToRSocketMap))
59+
.bindNow(TcpServerTransport.create(9991));
60+
61+
Logger logger = LoggerFactory.getLogger("RSocket.Client.ID[Test]");
62+
63+
Mono<RSocket> rSocketMono =
64+
RSocketConnector.create()
65+
.setupPayload(DefaultPayload.create("Test"))
66+
.acceptor(
67+
SocketAcceptor.forFireAndForget(
68+
p -> {
69+
logger.info("Received Processed Task[{}]", p.getDataUtf8());
70+
p.release();
71+
return Mono.empty();
72+
}))
73+
.connect(TcpClientTransport.create(9991));
74+
75+
RSocket rSocketRequester1 = rSocketMono.block();
76+
77+
for (int i = 0; i < 10; i++) {
78+
rSocketRequester1.fireAndForget(DefaultPayload.create("task" + i)).block();
79+
}
80+
81+
Thread.sleep(4000);
82+
83+
rSocketRequester1.dispose();
84+
logger.info("Disposed");
85+
86+
Thread.sleep(4000);
87+
88+
RSocket rSocketRequester2 = rSocketMono.block();
89+
90+
logger.info("Reconnected");
91+
92+
Thread.sleep(10000);
93+
}
94+
95+
static class BackgroundWorker extends BaseSubscriber<Task> {
96+
final ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap;
97+
final ConcurrentMap<String, RSocket> idToRSocketMap;
98+
99+
BackgroundWorker(
100+
Flux<Task> taskProducer,
101+
ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap,
102+
ConcurrentMap<String, RSocket> idToRSocketMap) {
103+
104+
this.idToCompletedTasksMap = idToCompletedTasksMap;
105+
this.idToRSocketMap = idToRSocketMap;
106+
107+
// mimic a long running task processing
108+
taskProducer
109+
.concatMap(
110+
t ->
111+
Mono.delay(Duration.ofMillis(ThreadLocalRandom.current().nextInt(200, 2000)))
112+
.thenReturn(t))
113+
.subscribe(this);
114+
}
115+
116+
@Override
117+
protected void hookOnNext(Task task) {
118+
BlockingQueue<Task> completedTasksQueue =
119+
idToCompletedTasksMap.computeIfAbsent(task.id, __ -> new LinkedBlockingQueue<>());
120+
121+
completedTasksQueue.offer(task);
122+
RSocket rSocket = idToRSocketMap.get(task.id);
123+
if (rSocket != null) {
124+
rSocket
125+
.fireAndForget(DefaultPayload.create(task.content))
126+
.subscribe(null, e -> {}, () -> completedTasksQueue.remove(task));
127+
}
128+
}
129+
}
130+
131+
static class TasksAcceptor implements SocketAcceptor {
132+
133+
static final Logger logger = LoggerFactory.getLogger(TasksAcceptor.class);
134+
135+
final UnicastProcessor<Task> tasksToProcess;
136+
final ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap;
137+
final ConcurrentMap<String, RSocket> idToRSocketMap;
138+
139+
TasksAcceptor(
140+
UnicastProcessor<Task> tasksToProcess,
141+
ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap,
142+
ConcurrentMap<String, RSocket> idToRSocketMap) {
143+
this.tasksToProcess = tasksToProcess;
144+
this.idToCompletedTasksMap = idToCompletedTasksMap;
145+
this.idToRSocketMap = idToRSocketMap;
146+
}
147+
148+
@Override
149+
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
150+
String id = setup.getDataUtf8();
151+
logger.info("Accepting a new client connection with ID {}", id);
152+
// sendingRSocket represents here an RSocket requester to a remote peer
153+
154+
if (this.idToRSocketMap.compute(
155+
id, (__, old) -> old == null || old.isDisposed() ? sendingSocket : old)
156+
== sendingSocket) {
157+
return Mono.<RSocket>just(
158+
new RSocketTaskHandler(idToRSocketMap, tasksToProcess, id, sendingSocket))
159+
.doOnSuccess(__ -> checkTasksToDeliver(sendingSocket, id));
160+
}
161+
162+
return Mono.error(
163+
new IllegalStateException("There is already a client connected with the same ID"));
164+
}
165+
166+
private void checkTasksToDeliver(RSocket sendingSocket, String id) {
167+
logger.info("Accepted a new client connection with ID {}. Checking for remaining tasks", id);
168+
BlockingQueue<Task> tasksToDeliver = this.idToCompletedTasksMap.get(id);
169+
170+
if (tasksToDeliver == null || tasksToDeliver.isEmpty()) {
171+
// means nothing yet to send
172+
return;
173+
}
174+
175+
logger.info("Found remaining tasks to deliver for client {}", id);
176+
177+
for (; ; ) {
178+
Task task = tasksToDeliver.poll();
179+
180+
if (task == null) {
181+
return;
182+
}
183+
184+
sendingSocket
185+
.fireAndForget(DefaultPayload.create(task.content))
186+
.subscribe(
187+
null,
188+
e -> {
189+
// offers back a task if it has not been delivered
190+
tasksToDeliver.offer(task);
191+
});
192+
}
193+
}
194+
195+
private static class RSocketTaskHandler implements RSocket {
196+
197+
private final String id;
198+
private final RSocket sendingSocket;
199+
private ConcurrentMap<String, RSocket> idToRSocketMap;
200+
private UnicastProcessor<Task> tasksToProcess;
201+
202+
public RSocketTaskHandler(
203+
ConcurrentMap<String, RSocket> idToRSocketMap,
204+
UnicastProcessor<Task> tasksToProcess,
205+
String id,
206+
RSocket sendingSocket) {
207+
this.id = id;
208+
this.sendingSocket = sendingSocket;
209+
this.idToRSocketMap = idToRSocketMap;
210+
this.tasksToProcess = tasksToProcess;
211+
}
212+
213+
@Override
214+
public Mono<Void> fireAndForget(Payload payload) {
215+
logger.info("Received a Task[{}] from Client.ID[{}]", payload.getDataUtf8(), id);
216+
tasksToProcess.onNext(new Task(id, payload.getDataUtf8()));
217+
payload.release();
218+
return Mono.empty();
219+
}
220+
221+
@Override
222+
public void dispose() {
223+
idToRSocketMap.remove(id, sendingSocket);
224+
}
225+
}
226+
}
227+
228+
static class Task {
229+
final String id;
230+
final String content;
231+
232+
Task(String id, String content) {
233+
this.id = id;
234+
this.content = content;
235+
}
236+
}
237+
}

0 commit comments

Comments
 (0)