Skip to content

Commit 75a6a99

Browse files
committed
refactors DuplexConnection API and reworks resumability impl
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 740785f commit 75a6a99

File tree

43 files changed

+1434
-1607
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1434
-1607
lines changed

rsocket-core/src/main/java/io/rsocket/DuplexConnection.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,21 @@ public interface DuplexConnection extends Availability, Closeable {
2828

2929
/**
3030
* Delivers the given frame to the underlying transport connection. This method is non-blocking
31-
* and can be safely executed from multiple threads.
31+
* and can be safely executed from multiple threads. This method does not provide any flow-control
32+
* mechanism.
3233
*
3334
* @param streamId to which the given frame relates
3435
* @param frame with the encoded content
35-
* @param prioritize whether the given frame should be sent with priority to others
3636
*/
37-
void sendFrame(int streamId, ByteBuf frame, boolean prioritize);
37+
void sendFrame(int streamId, ByteBuf frame);
38+
39+
/**
40+
* Sends terminal frame to the underlying connection and dispose the connection once the frame is
41+
* delivered
42+
*
43+
* @param errorFrame with the encoded error
44+
*/
45+
void sendErrorAndClose(RSocketErrorException errorException);
3846

3947
/**
4048
* Returns a stream of all {@code Frame}s received on this connection.
@@ -61,12 +69,6 @@ public interface DuplexConnection extends Availability, Closeable {
6169
*/
6270
Flux<ByteBuf> receive();
6371

64-
/**
65-
* @param errorFrame
66-
* @return
67-
*/
68-
void terminate(ByteBuf frame, RSocketErrorException terminalError);
69-
7072
/**
7173
* Returns the assigned {@link ByteBufAllocator}.
7274
*

rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java

Lines changed: 14 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,11 @@
2222
import io.rsocket.DuplexConnection;
2323
import io.rsocket.RSocketErrorException;
2424
import io.rsocket.frame.FrameHeaderCodec;
25-
import io.rsocket.frame.FrameUtil;
2625
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
2726
import io.rsocket.plugins.InitializingInterceptorRegistry;
2827
import java.net.SocketAddress;
2928
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3029
import org.reactivestreams.Subscription;
31-
import org.slf4j.Logger;
32-
import org.slf4j.LoggerFactory;
3330
import reactor.core.CoreSubscriber;
3431
import reactor.core.publisher.Flux;
3532
import reactor.core.publisher.Mono;
@@ -50,67 +47,40 @@
5047
*/
5148
class ClientServerInputMultiplexer implements CoreSubscriber<ByteBuf>, Closeable {
5249

53-
private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
54-
private static final InitializingInterceptorRegistry emptyInterceptorRegistry =
55-
new InitializingInterceptorRegistry();
56-
57-
private final InternalDuplexConnection setupReceiver;
5850
private final InternalDuplexConnection serverReceiver;
5951
private final InternalDuplexConnection clientReceiver;
60-
private final DuplexConnection setupConnection;
6152
private final DuplexConnection serverConnection;
6253
private final DuplexConnection clientConnection;
6354
private final DuplexConnection source;
6455
private final boolean isClient;
6556

6657
private Subscription s;
67-
private boolean setupReceived;
6858

6959
private Throwable t;
7060

7161
private volatile int state;
7262
private static final AtomicIntegerFieldUpdater<ClientServerInputMultiplexer> STATE =
7363
AtomicIntegerFieldUpdater.newUpdater(ClientServerInputMultiplexer.class, "state");
7464

75-
public ClientServerInputMultiplexer(DuplexConnection source) {
76-
this(source, emptyInterceptorRegistry, false);
77-
}
78-
7965
public ClientServerInputMultiplexer(
8066
DuplexConnection source, InitializingInterceptorRegistry registry, boolean isClient) {
8167
this.source = source;
8268
this.isClient = isClient;
83-
source = registry.initConnection(Type.SOURCE, source);
8469

85-
if (!isClient) {
86-
setupReceiver = new InternalDuplexConnection(this, source);
87-
setupConnection = registry.initConnection(Type.SETUP, setupReceiver);
88-
} else {
89-
setupReceiver = null;
90-
setupConnection = null;
91-
}
92-
serverReceiver = new InternalDuplexConnection(this, source);
93-
clientReceiver = new InternalDuplexConnection(this, source);
94-
serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
95-
clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
70+
this.serverReceiver = new InternalDuplexConnection(this, source);
71+
this.clientReceiver = new InternalDuplexConnection(this, source);
72+
this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
73+
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
9674
}
9775

98-
DuplexConnection asClientServerConnection() {
99-
return source;
100-
}
101-
102-
DuplexConnection asServerConnection() {
76+
DuplexConnection asResponderConnection() {
10377
return serverConnection;
10478
}
10579

106-
DuplexConnection asClientConnection() {
80+
DuplexConnection asRequesterConnection() {
10781
return clientConnection;
10882
}
10983

110-
DuplexConnection asSetupConnection() {
111-
return setupConnection;
112-
}
113-
11484
@Override
11585
public void dispose() {
11686
source.dispose();
@@ -130,12 +100,7 @@ public Mono<Void> onClose() {
130100
public void onSubscribe(Subscription s) {
131101
if (Operators.validate(this.s, s)) {
132102
this.s = s;
133-
if (isClient) {
134-
s.request(Long.MAX_VALUE);
135-
} else {
136-
// request first SetupFrame
137-
s.request(1);
138-
}
103+
s.request(Long.MAX_VALUE);
139104
}
140105
}
141106

@@ -145,12 +110,6 @@ public void onNext(ByteBuf frame) {
145110
final Type type;
146111
if (streamId == 0) {
147112
switch (FrameHeaderCodec.frameType(frame)) {
148-
case SETUP:
149-
case RESUME:
150-
case RESUME_OK:
151-
type = Type.SETUP;
152-
setupReceived = true;
153-
break;
154113
case LEASE:
155114
case KEEPALIVE:
156115
case ERROR:
@@ -164,19 +123,8 @@ public void onNext(ByteBuf frame) {
164123
} else {
165124
type = Type.CLIENT;
166125
}
167-
if (!isClient && type != Type.SETUP && !setupReceived) {
168-
final IllegalStateException error =
169-
new IllegalStateException("SETUP or LEASE frame must be received before any others.");
170-
this.s.cancel();
171-
onError(error);
172-
}
173126

174127
switch (type) {
175-
case SETUP:
176-
final InternalDuplexConnection setupReceiver = this.setupReceiver;
177-
setupReceiver.onNext(frame);
178-
setupReceiver.onComplete();
179-
break;
180128
case CLIENT:
181129
clientReceiver.onNext(frame);
182130
break;
@@ -193,16 +141,6 @@ public void onComplete() {
193141
return;
194142
}
195143

196-
if (!isClient) {
197-
if (!setupReceived) {
198-
setupReceiver.onComplete();
199-
}
200-
201-
if (previousState == 1) {
202-
return;
203-
}
204-
}
205-
206144
if (clientReceiver.isSubscribed()) {
207145
clientReceiver.onComplete();
208146
}
@@ -220,16 +158,6 @@ public void onError(Throwable t) {
220158
return;
221159
}
222160

223-
if (!isClient) {
224-
if (!setupReceived) {
225-
setupReceiver.onError(t);
226-
}
227-
228-
if (previousState == 1) {
229-
return;
230-
}
231-
}
232-
233161
if (clientReceiver.isSubscribed()) {
234162
clientReceiver.onError(t);
235163
}
@@ -244,17 +172,8 @@ boolean notifyRequested() {
244172
return false;
245173
}
246174

247-
if (isClient) {
248-
if (currentState == 2) {
249-
source.receive().subscribe(this);
250-
}
251-
} else {
252-
if (currentState == 1) {
253-
source.receive().subscribe(this);
254-
} else if (currentState == 3) {
255-
// means setup was consumed and we got request from client and server multiplexers
256-
s.request(Long.MAX_VALUE);
257-
}
175+
if (currentState == 2) {
176+
source.receive().subscribe(this);
258177
}
259178

260179
return true;
@@ -280,7 +199,6 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
280199
implements Subscription, DuplexConnection {
281200
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
282201
private final DuplexConnection source;
283-
private final boolean debugEnabled;
284202

285203
private volatile int state;
286204
static final AtomicIntegerFieldUpdater<InternalDuplexConnection> STATE =
@@ -292,7 +210,6 @@ public InternalDuplexConnection(
292210
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
293211
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
294212
this.source = source;
295-
this.debugEnabled = LOGGER.isDebugEnabled();
296213
}
297214

298215
@Override
@@ -340,30 +257,18 @@ void onError(Throwable t) {
340257
}
341258

342259
@Override
343-
public void sendFrame(int streamId, ByteBuf frame, boolean prioritize) {
344-
if (debugEnabled) {
345-
LOGGER.debug("sending -> " + FrameUtil.toString(frame));
346-
}
347-
348-
source.sendFrame(streamId, frame, prioritize);
260+
public void sendFrame(int streamId, ByteBuf frame) {
261+
source.sendFrame(streamId, frame);
349262
}
350263

351264
@Override
352-
public void terminate(ByteBuf frame, RSocketErrorException terminalError) {
353-
if (debugEnabled) {
354-
LOGGER.debug("sending -> " + FrameUtil.toString(frame));
355-
}
356-
357-
source.terminate(frame, terminalError);
265+
public void sendErrorAndClose(RSocketErrorException e) {
266+
source.sendErrorAndClose(e);
358267
}
359268

360269
@Override
361270
public Flux<ByteBuf> receive() {
362-
if (debugEnabled) {
363-
return this.doOnNext(frame -> LOGGER.debug("receiving -> " + FrameUtil.toString(frame)));
364-
} else {
365-
return this;
366-
}
271+
return this;
367272
}
368273

369274
@Override
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.rsocket.core;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
import io.rsocket.DuplexConnection;
6+
import reactor.core.publisher.Mono;
7+
import reactor.util.function.Tuple2;
8+
import reactor.util.function.Tuples;
9+
10+
abstract class ClientSetup {
11+
abstract Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection);
12+
}
13+
14+
class DefaultClientSetup extends ClientSetup {
15+
16+
@Override
17+
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
18+
return Mono.create(
19+
sink -> sink.onRequest(__ -> sink.success(Tuples.of(Unpooled.EMPTY_BUFFER, connection))));
20+
}
21+
}
22+
23+
class ResumableClientSetup extends ClientSetup {
24+
25+
@Override
26+
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
27+
return Mono.create(
28+
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)));
29+
}
30+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.rsocket.core;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.ByteBufAllocator;
5+
import io.rsocket.DuplexConnection;
6+
import io.rsocket.RSocketErrorException;
7+
import io.rsocket.frame.FrameUtil;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import reactor.core.publisher.Flux;
11+
import reactor.core.publisher.Mono;
12+
13+
class LoggingDuplexConnection implements DuplexConnection {
14+
15+
private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
16+
17+
final DuplexConnection source;
18+
19+
LoggingDuplexConnection(DuplexConnection source) {
20+
this.source = source;
21+
}
22+
23+
@Override
24+
public void dispose() {
25+
source.dispose();
26+
}
27+
28+
@Override
29+
public Mono<Void> onClose() {
30+
return source.onClose();
31+
}
32+
33+
@Override
34+
public void sendFrame(int streamId, ByteBuf frame) {
35+
LOGGER.debug("sending -> " + FrameUtil.toString(frame));
36+
37+
source.sendFrame(streamId, frame);
38+
}
39+
40+
@Override
41+
public void sendErrorAndClose(RSocketErrorException e) {
42+
LOGGER.debug("sending -> " + e.getClass().getSimpleName() + ": " + e.getMessage());
43+
44+
source.sendErrorAndClose(e);
45+
}
46+
47+
@Override
48+
public Flux<ByteBuf> receive() {
49+
return source
50+
.receive()
51+
.doOnNext(frame -> LOGGER.debug("receiving -> " + FrameUtil.toString(frame)));
52+
}
53+
54+
@Override
55+
public ByteBufAllocator alloc() {
56+
return source.alloc();
57+
}
58+
59+
static DuplexConnection wrapIfEnabled(DuplexConnection source) {
60+
if (LOGGER.isDebugEnabled()) {
61+
return new LoggingDuplexConnection(source);
62+
}
63+
64+
return source;
65+
}
66+
}

rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
109109

110110
final ByteBuf requestFrame =
111111
MetadataPushFrameCodec.encode(this.allocator, metadataRetainedSlice);
112-
this.connection.sendFrame(0, requestFrame, true);
112+
this.connection.sendFrame(0, requestFrame);
113113

114114
Operators.complete(actual);
115115
}
@@ -166,7 +166,7 @@ public Void block() {
166166

167167
final ByteBuf requestFrame =
168168
MetadataPushFrameCodec.encode(this.allocator, metadataRetainedSlice);
169-
this.connection.sendFrame(0, requestFrame, true);
169+
this.connection.sendFrame(0, requestFrame);
170170

171171
return null;
172172
}

0 commit comments

Comments
 (0)