Skip to content

Commit 1a57e9a

Browse files
authored
Add RSocket to RSocketClient adapter (#891)
1 parent d29c16c commit 1a57e9a

File tree

2 files changed

+94
-0
lines changed

2 files changed

+94
-0
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.rsocket;
217

318
import org.reactivestreams.Publisher;
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import io.rsocket.Payload;
19+
import io.rsocket.RSocket;
20+
import io.rsocket.RSocketClient;
21+
import org.reactivestreams.Publisher;
22+
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.Mono;
24+
25+
/**
26+
* Simple adapter from {@link RSocket} to {@link RSocketClient}. This is useful in code that needs
27+
* to deal with both in the same way. When connecting to a server, typically {@link RSocketClient}
28+
* is expected to be used, but in a responder (client or server), it is necessary to interact with
29+
* {@link RSocket} to make requests to the remote end.
30+
*
31+
* @since 1.1
32+
*/
33+
public class RSocketClientAdapter implements RSocketClient {
34+
35+
private final RSocket rsocket;
36+
37+
public RSocketClientAdapter(RSocket rsocket) {
38+
this.rsocket = rsocket;
39+
}
40+
41+
public RSocket rsocket() {
42+
return rsocket;
43+
}
44+
45+
@Override
46+
public Mono<RSocket> source() {
47+
return Mono.just(rsocket);
48+
}
49+
50+
@Override
51+
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
52+
return payloadMono.flatMap(rsocket::fireAndForget);
53+
}
54+
55+
@Override
56+
public Mono<Payload> requestResponse(Mono<Payload> payloadMono) {
57+
return payloadMono.flatMap(rsocket::requestResponse);
58+
}
59+
60+
@Override
61+
public Flux<Payload> requestStream(Mono<Payload> payloadMono) {
62+
return payloadMono.flatMapMany(rsocket::requestStream);
63+
}
64+
65+
@Override
66+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
67+
return rsocket.requestChannel(payloads);
68+
}
69+
70+
@Override
71+
public Mono<Void> metadataPush(Mono<Payload> payloadMono) {
72+
return payloadMono.flatMap(rsocket::metadataPush);
73+
}
74+
75+
@Override
76+
public void dispose() {
77+
rsocket.dispose();
78+
}
79+
}

0 commit comments

Comments
 (0)