Skip to content

Commit 740785f

Browse files
committed
provides enhanced DuplexConnection api
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 4606f25 commit 740785f

File tree

47 files changed

+1613
-1575
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1613
-1575
lines changed

rsocket-core/src/main/java/io/rsocket/DuplexConnection.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,40 +20,21 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import java.net.SocketAddress;
2222
import java.nio.channels.ClosedChannelException;
23-
import org.reactivestreams.Publisher;
2423
import org.reactivestreams.Subscriber;
2524
import reactor.core.publisher.Flux;
26-
import reactor.core.publisher.Mono;
2725

2826
/** Represents a connection with input/output that the protocol uses. */
2927
public interface DuplexConnection extends Availability, Closeable {
3028

3129
/**
32-
* Sends the source of Frames on this connection and returns the {@code Publisher} representing
33-
* the result of this send.
30+
* Delivers the given frame to the underlying transport connection. This method is non-blocking
31+
* and can be safely executed from multiple threads.
3432
*
35-
* <p><strong>Flow control</strong>
36-
*
37-
* <p>The passed {@code Publisher} must
38-
*
39-
* @param frames Stream of {@code Frame}s to send on the connection.
40-
* @return {@code Publisher} that completes when all the frames are written on the connection
41-
* successfully and errors when it fails.
42-
* @throws NullPointerException if {@code frames} is {@code null}
43-
*/
44-
Mono<Void> send(Publisher<ByteBuf> frames);
45-
46-
/**
47-
* Sends a single {@code Frame} on this connection and returns the {@code Publisher} representing
48-
* the result of this send.
49-
*
50-
* @param frame {@code Frame} to send.
51-
* @return {@code Publisher} that completes when the frame is written on the connection
52-
* successfully and errors when it fails.
33+
* @param streamId to which the given frame relates
34+
* @param frame with the encoded content
35+
* @param prioritize whether the given frame should be sent with priority to others
5336
*/
54-
default Mono<Void> sendOne(ByteBuf frame) {
55-
return send(Mono.just(frame));
56-
}
37+
void sendFrame(int streamId, ByteBuf frame, boolean prioritize);
5738

5839
/**
5940
* Returns a stream of all {@code Frame}s received on this connection.
@@ -80,6 +61,12 @@ default Mono<Void> sendOne(ByteBuf frame) {
8061
*/
8162
Flux<ByteBuf> receive();
8263

64+
/**
65+
* @param errorFrame
66+
* @return
67+
*/
68+
void terminate(ByteBuf frame, RSocketErrorException terminalError);
69+
8370
/**
8471
* Returns the assigned {@link ByteBufAllocator}.
8572
*

rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.rsocket.Closeable;
2222
import io.rsocket.DuplexConnection;
23+
import io.rsocket.RSocketErrorException;
2324
import io.rsocket.frame.FrameHeaderCodec;
2425
import io.rsocket.frame.FrameUtil;
2526
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
2627
import io.rsocket.plugins.InitializingInterceptorRegistry;
2728
import java.net.SocketAddress;
2829
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
29-
import org.reactivestreams.Publisher;
3030
import org.reactivestreams.Subscription;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
@@ -95,19 +95,19 @@ public ClientServerInputMultiplexer(
9595
clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
9696
}
9797

98-
public DuplexConnection asClientServerConnection() {
98+
DuplexConnection asClientServerConnection() {
9999
return source;
100100
}
101101

102-
public DuplexConnection asServerConnection() {
102+
DuplexConnection asServerConnection() {
103103
return serverConnection;
104104
}
105105

106-
public DuplexConnection asClientConnection() {
106+
DuplexConnection asClientConnection() {
107107
return clientConnection;
108108
}
109109

110-
public DuplexConnection asSetupConnection() {
110+
DuplexConnection asSetupConnection() {
111111
return setupConnection;
112112
}
113113

@@ -340,23 +340,21 @@ void onError(Throwable t) {
340340
}
341341

342342
@Override
343-
public Mono<Void> send(Publisher<ByteBuf> frame) {
343+
public void sendFrame(int streamId, ByteBuf frame, boolean prioritize) {
344344
if (debugEnabled) {
345-
return Flux.from(frame)
346-
.doOnNext(f -> LOGGER.debug("sending -> " + FrameUtil.toString(f)))
347-
.as(source::send);
345+
LOGGER.debug("sending -> " + FrameUtil.toString(frame));
348346
}
349347

350-
return source.send(frame);
348+
source.sendFrame(streamId, frame, prioritize);
351349
}
352350

353351
@Override
354-
public Mono<Void> sendOne(ByteBuf frame) {
352+
public void terminate(ByteBuf frame, RSocketErrorException terminalError) {
355353
if (debugEnabled) {
356354
LOGGER.debug("sending -> " + FrameUtil.toString(frame));
357355
}
358356

359-
return source.sendOne(frame);
357+
source.terminate(frame, terminalError);
360358
}
361359

362360
@Override

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@
2020
import static io.rsocket.core.SendUtils.sendReleasingPayload;
2121
import static io.rsocket.core.StateUtils.*;
2222

23-
import io.netty.buffer.ByteBuf;
2423
import io.netty.buffer.ByteBufAllocator;
2524
import io.netty.util.IllegalReferenceCountException;
25+
import io.rsocket.DuplexConnection;
2626
import io.rsocket.Payload;
2727
import io.rsocket.frame.FrameType;
28-
import io.rsocket.internal.UnboundedProcessor;
2928
import java.time.Duration;
3029
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3130
import org.reactivestreams.Subscription;
@@ -50,15 +49,15 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
5049
final int mtu;
5150
final int maxFrameLength;
5251
final RequesterResponderSupport requesterResponderSupport;
53-
final UnboundedProcessor<ByteBuf> sendProcessor;
52+
final DuplexConnection connection;
5453

5554
FireAndForgetRequesterMono(Payload payload, RequesterResponderSupport requesterResponderSupport) {
5655
this.allocator = requesterResponderSupport.getAllocator();
5756
this.payload = payload;
5857
this.mtu = requesterResponderSupport.getMtu();
5958
this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
6059
this.requesterResponderSupport = requesterResponderSupport;
61-
this.sendProcessor = requesterResponderSupport.getSendProcessor();
60+
this.connection = requesterResponderSupport.getDuplexConnection();
6261
}
6362

6463
@Override
@@ -106,7 +105,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
106105
}
107106

108107
sendReleasingPayload(
109-
streamId, FrameType.REQUEST_FNF, mtu, p, this.sendProcessor, this.allocator, true);
108+
streamId, FrameType.REQUEST_FNF, mtu, p, this.connection, this.allocator, true);
110109
} catch (Throwable e) {
111110
lazyTerminate(STATE, this);
112111
actual.onError(e);
@@ -169,7 +168,7 @@ public Void block() {
169168
FrameType.REQUEST_FNF,
170169
this.mtu,
171170
this.payload,
172-
this.sendProcessor,
171+
this.connection,
173172
this.allocator,
174173
true);
175174
} catch (Throwable e) {

rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import io.netty.buffer.ByteBuf;
2323
import io.netty.buffer.ByteBufAllocator;
2424
import io.netty.util.IllegalReferenceCountException;
25+
import io.rsocket.DuplexConnection;
2526
import io.rsocket.Payload;
2627
import io.rsocket.frame.MetadataPushFrameCodec;
27-
import io.rsocket.internal.UnboundedProcessor;
2828
import java.time.Duration;
2929
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3030
import reactor.core.CoreSubscriber;
@@ -43,13 +43,13 @@ final class MetadataPushRequesterMono extends Mono<Void> implements Scannable {
4343
final ByteBufAllocator allocator;
4444
final Payload payload;
4545
final int maxFrameLength;
46-
final UnboundedProcessor<ByteBuf> sendProcessor;
46+
final DuplexConnection connection;
4747

4848
MetadataPushRequesterMono(Payload payload, RequesterResponderSupport requesterResponderSupport) {
4949
this.allocator = requesterResponderSupport.getAllocator();
5050
this.payload = payload;
5151
this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
52-
this.sendProcessor = requesterResponderSupport.getSendProcessor();
52+
this.connection = requesterResponderSupport.getDuplexConnection();
5353
}
5454

5555
@Override
@@ -109,7 +109,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
109109

110110
final ByteBuf requestFrame =
111111
MetadataPushFrameCodec.encode(this.allocator, metadataRetainedSlice);
112-
this.sendProcessor.onNext(requestFrame);
112+
this.connection.sendFrame(0, requestFrame, true);
113113

114114
Operators.complete(actual);
115115
}
@@ -166,7 +166,7 @@ public Void block() {
166166

167167
final ByteBuf requestFrame =
168168
MetadataPushFrameCodec.encode(this.allocator, metadataRetainedSlice);
169-
this.sendProcessor.onNext(requestFrame);
169+
this.connection.sendFrame(0, requestFrame, true);
170170

171171
return null;
172172
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -551,8 +551,10 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
551551
.resumeToken(resumeToken);
552552
keepAliveHandler =
553553
new KeepAliveHandler.ResumableKeepAliveHandler(
554-
session.resumableConnection());
555-
wrappedConnection = session.resumableConnection();
554+
/*session.resumableConnection()*/ null);
555+
// wrappedConnection =
556+
// session.resumableConnection();
557+
wrappedConnection = null;
556558
} else {
557559
resumeToken = Unpooled.EMPTY_BUFFER;
558560
keepAliveHandler =
@@ -608,7 +610,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
608610
return interceptors
609611
.initSocketAcceptor(acceptor)
610612
.accept(setup, wrappedRSocketRequester)
611-
.flatMap(
613+
.map(
612614
rSocketHandler -> {
613615
RSocket wrappedRSocketHandler =
614616
interceptors.initResponder(rSocketHandler);
@@ -632,9 +634,9 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
632634
maxFrameLength,
633635
maxInboundPayloadSize);
634636

635-
return wrappedConnection
636-
.sendOne(setupFrame.retain())
637-
.thenReturn(wrappedRSocketRequester);
637+
wrappedConnection.sendFrame(0, setupFrame.retain(), false);
638+
639+
return wrappedRSocketRequester;
638640
})
639641
.doFinally(signalType -> setup.release());
640642
});

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.rsocket.frame.FrameType;
3030
import io.rsocket.frame.RequestNFrameCodec;
3131
import io.rsocket.frame.decoder.PayloadDecoder;
32-
import io.rsocket.internal.UnboundedProcessor;
3332
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
3433
import io.rsocket.keepalive.KeepAliveHandler;
3534
import io.rsocket.keepalive.KeepAliveSupport;
@@ -62,7 +61,6 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
6261
AtomicReferenceFieldUpdater.newUpdater(
6362
RSocketRequester.class, Throwable.class, "terminationError");
6463

65-
private final DuplexConnection connection;
6664
private final RequesterLeaseHandler leaseHandler;
6765
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
6866
private final MonoProcessor<Void> onClose;
@@ -78,23 +76,13 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
7876
int keepAliveAckTimeout,
7977
@Nullable KeepAliveHandler keepAliveHandler,
8078
RequesterLeaseHandler leaseHandler) {
81-
super(
82-
mtu,
83-
maxFrameLength,
84-
maxInboundPayloadSize,
85-
payloadDecoder,
86-
connection.alloc(),
87-
streamIdSupplier);
88-
89-
this.connection = connection;
79+
super(mtu, maxFrameLength, maxInboundPayloadSize, payloadDecoder, connection, streamIdSupplier);
80+
9081
this.leaseHandler = leaseHandler;
9182
this.onClose = MonoProcessor.create();
9283

93-
UnboundedProcessor<ByteBuf> sendProcessor = super.getSendProcessor();
94-
9584
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
9685
connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryShutdown);
97-
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);
9886

9987
connection.receive().subscribe(this::handleIncomingFrames, e -> {});
10088

@@ -103,7 +91,9 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
10391
new ClientKeepAliveSupport(this.getAllocator(), keepAliveTickPeriod, keepAliveAckTimeout);
10492
this.keepAliveFramesAcceptor =
10593
keepAliveHandler.start(
106-
keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive);
94+
keepAliveSupport,
95+
(keepAliveFrame) -> connection.sendFrame(0, keepAliveFrame, true),
96+
this::tryTerminateOnKeepAlive);
10797
} else {
10898
keepAliveFramesAcceptor = null;
10999
}
@@ -177,7 +167,7 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
177167

178168
@Override
179169
public double availability() {
180-
return Math.min(connection.availability(), leaseHandler.availability());
170+
return Math.min(getDuplexConnection().availability(), leaseHandler.availability());
181171
}
182172

183173
@Override
@@ -206,13 +196,10 @@ private void handleIncomingFrames(ByteBuf frame) {
206196
}
207197
} catch (Throwable t) {
208198
LOGGER.error("Unexpected error during frame handling", t);
209-
super.getSendProcessor()
210-
.onNext(
211-
ErrorFrameCodec.encode(
212-
super.getAllocator(),
213-
0,
214-
new ConnectionErrorException("Unexpected error during frame handling", t)));
215-
this.tryTerminateOnConnectionError(t);
199+
final ConnectionErrorException error =
200+
new ConnectionErrorException("Unexpected error during frame handling", t);
201+
getDuplexConnection()
202+
.terminate(ErrorFrameCodec.encode(super.getAllocator(), 0, error), error);
216203
}
217204
}
218205

@@ -332,7 +319,7 @@ private void terminate(Throwable e) {
332319
if (keepAliveFramesAcceptor != null) {
333320
keepAliveFramesAcceptor.dispose();
334321
}
335-
connection.dispose();
322+
getDuplexConnection().dispose();
336323
leaseHandler.dispose();
337324

338325
synchronized (this) {
@@ -347,15 +334,10 @@ private void terminate(Throwable e) {
347334
});
348335
}
349336

350-
this.getSendProcessor().dispose();
351337
if (e == CLOSED_CHANNEL_EXCEPTION) {
352338
onClose.onComplete();
353339
} else {
354340
onClose.onError(e);
355341
}
356342
}
357-
358-
private void handleSendProcessorError(Throwable t) {
359-
connection.dispose();
360-
}
361343
}

0 commit comments

Comments
 (0)