Skip to content

Commit f2cfe20

Browse files
committed
adds example of enabling websocket frame aggregation
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent eecbd6d commit f2cfe20

File tree

1 file changed

+80
-0
lines changed

1 file changed

+80
-0
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2015-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.examples.transport.ws;
18+
19+
import io.rsocket.RSocket;
20+
import io.rsocket.SocketAcceptor;
21+
import io.rsocket.core.RSocketConnector;
22+
import io.rsocket.core.RSocketServer;
23+
import io.rsocket.frame.decoder.PayloadDecoder;
24+
import io.rsocket.transport.ServerTransport;
25+
import io.rsocket.transport.netty.WebsocketDuplexConnection;
26+
import io.rsocket.transport.netty.client.WebsocketClientTransport;
27+
import io.rsocket.util.ByteBufPayload;
28+
import java.time.Duration;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
import reactor.core.publisher.Flux;
32+
import reactor.core.publisher.Mono;
33+
import reactor.netty.Connection;
34+
import reactor.netty.DisposableServer;
35+
import reactor.netty.http.server.HttpServer;
36+
37+
public class WebSocketAggregationSample {
38+
39+
private static final Logger logger = LoggerFactory.getLogger(WebSocketAggregationSample.class);
40+
41+
public static void main(String[] args) {
42+
43+
ServerTransport.ConnectionAcceptor connectionAcceptor =
44+
RSocketServer.create(SocketAcceptor.forRequestResponse(Mono::just))
45+
.payloadDecoder(PayloadDecoder.ZERO_COPY)
46+
.asConnectionAcceptor();
47+
48+
DisposableServer server =
49+
HttpServer.create()
50+
.host("localhost")
51+
.port(0)
52+
.handle(
53+
(req, res) ->
54+
res.sendWebsocket(
55+
(in, out) ->
56+
connectionAcceptor
57+
.apply(
58+
new WebsocketDuplexConnection(
59+
(Connection) in.aggregateFrames()))
60+
.then(out.neverComplete())))
61+
.bindNow();
62+
63+
WebsocketClientTransport transport =
64+
WebsocketClientTransport.create(server.host(), server.port());
65+
66+
RSocket clientRSocket =
67+
RSocketConnector.create()
68+
.keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
69+
.payloadDecoder(PayloadDecoder.ZERO_COPY)
70+
.connect(transport)
71+
.block();
72+
73+
Flux.range(1, 100)
74+
.concatMap(i -> clientRSocket.requestResponse(ByteBufPayload.create("Hello " + i)))
75+
.doOnNext(payload -> logger.debug("Processed " + payload.getDataUtf8()))
76+
.blockLast();
77+
clientRSocket.dispose();
78+
server.dispose();
79+
}
80+
}

0 commit comments

Comments
 (0)