|
17 | 17 | package io.rsocket.examples.transport.ws;
|
18 | 18 |
|
19 | 19 | import io.netty.handler.codec.http.HttpResponseStatus;
|
20 |
| -import io.rsocket.DuplexConnection; |
21 |
| -import io.rsocket.Payload; |
22 | 20 | import io.rsocket.RSocket;
|
23 | 21 | import io.rsocket.SocketAcceptor;
|
24 | 22 | import io.rsocket.core.RSocketConnector;
|
25 | 23 | import io.rsocket.core.RSocketServer;
|
26 |
| -import io.rsocket.fragmentation.ReassemblyDuplexConnection; |
27 | 24 | import io.rsocket.frame.decoder.PayloadDecoder;
|
28 | 25 | import io.rsocket.transport.ServerTransport;
|
29 | 26 | import io.rsocket.transport.netty.WebsocketDuplexConnection;
|
30 | 27 | import io.rsocket.transport.netty.client.WebsocketClientTransport;
|
31 | 28 | import io.rsocket.util.ByteBufPayload;
|
32 | 29 | import java.time.Duration;
|
33 |
| -import java.util.HashMap; |
34 |
| -import org.reactivestreams.Publisher; |
| 30 | +import java.util.Collections; |
| 31 | +import org.slf4j.Logger; |
| 32 | +import org.slf4j.LoggerFactory; |
35 | 33 | import reactor.core.publisher.Flux;
|
36 | 34 | import reactor.core.publisher.Mono;
|
37 |
| -import reactor.core.scheduler.Schedulers; |
38 | 35 | import reactor.netty.Connection;
|
39 | 36 | import reactor.netty.DisposableServer;
|
40 | 37 | import reactor.netty.http.server.HttpServer;
|
41 | 38 |
|
42 | 39 | public class WebSocketHeadersSample {
|
43 |
| - static final Payload payload1 = ByteBufPayload.create("Hello "); |
| 40 | + |
| 41 | + private static final Logger logger = LoggerFactory.getLogger(WebSocketHeadersSample.class); |
44 | 42 |
|
45 | 43 | public static void main(String[] args) {
|
46 | 44 |
|
47 |
| - ServerTransport.ConnectionAcceptor acceptor = |
48 |
| - RSocketServer.create(SocketAcceptor.with(new ServerRSocket())) |
| 45 | + ServerTransport.ConnectionAcceptor connectionAcceptor = |
| 46 | + RSocketServer.create(SocketAcceptor.forRequestResponse(Mono::just)) |
49 | 47 | .payloadDecoder(PayloadDecoder.ZERO_COPY)
|
50 | 48 | .asConnectionAcceptor();
|
51 | 49 |
|
52 |
| - DisposableServer disposableServer = |
| 50 | + DisposableServer server = |
53 | 51 | HttpServer.create()
|
54 | 52 | .host("localhost")
|
55 | 53 | .port(0)
|
56 | 54 | .route(
|
57 | 55 | routes ->
|
58 |
| - routes.ws( |
| 56 | + routes.get( |
59 | 57 | "/",
|
60 |
| - (in, out) -> { |
61 |
| - if (in.headers().containsValue("Authorization", "test", true)) { |
62 |
| - DuplexConnection connection = |
63 |
| - new ReassemblyDuplexConnection( |
64 |
| - new WebsocketDuplexConnection((Connection) in)); |
65 |
| - return acceptor.apply(connection).then(out.neverComplete()); |
| 58 | + (req, res) -> { |
| 59 | + if (req.requestHeaders().containsValue("Authorization", "test", true)) { |
| 60 | + return res.sendWebsocket( |
| 61 | + (in, out) -> |
| 62 | + connectionAcceptor |
| 63 | + .apply(new WebsocketDuplexConnection((Connection) in)) |
| 64 | + .then(out.neverComplete())); |
66 | 65 | }
|
67 |
| - |
68 |
| - return out.sendClose( |
69 |
| - HttpResponseStatus.UNAUTHORIZED.code(), |
70 |
| - HttpResponseStatus.UNAUTHORIZED.reasonPhrase()); |
| 66 | + res.status(HttpResponseStatus.UNAUTHORIZED); |
| 67 | + return res.send(); |
71 | 68 | }))
|
72 | 69 | .bindNow();
|
73 | 70 |
|
| 71 | + logger.debug( |
| 72 | + "\n\nStart of Authorized WebSocket Connection\n----------------------------------\n"); |
| 73 | + |
74 | 74 | WebsocketClientTransport clientTransport =
|
75 |
| - WebsocketClientTransport.create(disposableServer.host(), disposableServer.port()); |
| 75 | + WebsocketClientTransport.create(server.host(), server.port()); |
76 | 76 |
|
77 |
| - clientTransport.setTransportHeaders( |
78 |
| - () -> { |
79 |
| - HashMap<String, String> map = new HashMap<>(); |
80 |
| - map.put("Authorization", "test"); |
81 |
| - return map; |
82 |
| - }); |
| 77 | + clientTransport.setTransportHeaders(() -> Collections.singletonMap("Authorization", "test")); |
83 | 78 |
|
84 |
| - RSocket socket = |
| 79 | + RSocket clientRSocket = |
85 | 80 | RSocketConnector.create()
|
86 | 81 | .keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
|
87 | 82 | .payloadDecoder(PayloadDecoder.ZERO_COPY)
|
88 | 83 | .connect(clientTransport)
|
89 | 84 | .block();
|
90 | 85 |
|
91 |
| - Flux.range(0, 100) |
92 |
| - .concatMap(i -> socket.fireAndForget(payload1.retain())) |
93 |
| - // .doOnNext(p -> { |
94 |
| - //// System.out.println(p.getDataUtf8()); |
95 |
| - // p.release(); |
96 |
| - // }) |
| 86 | + Flux.range(1, 100) |
| 87 | + .concatMap(i -> clientRSocket.requestResponse(ByteBufPayload.create("Hello " + i))) |
| 88 | + .doOnNext(payload -> logger.debug("Processed " + payload.getDataUtf8())) |
97 | 89 | .blockLast();
|
98 |
| - socket.dispose(); |
99 |
| - |
100 |
| - WebsocketClientTransport clientTransport2 = |
101 |
| - WebsocketClientTransport.create(disposableServer.host(), disposableServer.port()); |
102 |
| - |
103 |
| - RSocket rSocket = |
104 |
| - RSocketConnector.create() |
105 |
| - .keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10)) |
106 |
| - .payloadDecoder(PayloadDecoder.ZERO_COPY) |
107 |
| - .connect(clientTransport2) |
108 |
| - .block(); |
109 |
| - |
110 |
| - // expect error here because of closed channel |
111 |
| - rSocket.requestResponse(payload1).block(); |
112 |
| - } |
113 |
| - |
114 |
| - private static class ServerRSocket implements RSocket { |
115 |
| - |
116 |
| - @Override |
117 |
| - public Mono<Void> fireAndForget(Payload payload) { |
118 |
| - // System.out.println(payload.getDataUtf8()); |
119 |
| - payload.release(); |
120 |
| - return Mono.empty(); |
121 |
| - } |
| 90 | + clientRSocket.dispose(); |
122 | 91 |
|
123 |
| - @Override |
124 |
| - public Mono<Payload> requestResponse(Payload payload) { |
125 |
| - return Mono.just(payload); |
126 |
| - } |
| 92 | + logger.debug( |
| 93 | + "\n\nStart of Unauthorized WebSocket Upgrade\n----------------------------------\n"); |
127 | 94 |
|
128 |
| - @Override |
129 |
| - public Flux<Payload> requestChannel(Publisher<Payload> payloads) { |
130 |
| - return Flux.from(payloads).subscribeOn(Schedulers.single()); |
131 |
| - } |
| 95 | + RSocketConnector.create() |
| 96 | + .keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10)) |
| 97 | + .payloadDecoder(PayloadDecoder.ZERO_COPY) |
| 98 | + .connect(WebsocketClientTransport.create(server.host(), server.port())) |
| 99 | + .block(); |
132 | 100 | }
|
133 | 101 | }
|
0 commit comments