Skip to content

Commit a7570dc

Browse files
NiteshKantstevegury
authored andcommitted
Fix for requestChannel and requestStream (#186)
In the current implementation stream and channel methods are broken. Fixed stream and channel methods to work end to end. Also added examples for the same. They are now implemented using the existing `RemoteReceiver` and `RemoteSender` classes. One major problem with using `ConnectableUnicastProcessor` was that it does not discriminate between subscriber of request and sender of `requestN` and `cancel` frames. So, if the request source is completed, sending `requestN` and `cancel` frames result in them being rejected by the transport. This is one of the reasons `RemoteReceiver` and `RemoteSender` classes were created. __ request-response implementation is not changed in this change as that needs to be optimized for a single item request-response__ Channel and stream work now.
1 parent 60e9c28 commit a7570dc

File tree

11 files changed

+301
-96
lines changed

11 files changed

+301
-96
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java

Lines changed: 93 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
import io.reactivesocket.exceptions.CancelException;
2121
import io.reactivesocket.exceptions.Exceptions;
2222
import io.reactivesocket.internal.KnownErrorFilter;
23+
import io.reactivesocket.internal.RemoteReceiver;
24+
import io.reactivesocket.internal.RemoteSender;
2325
import io.reactivesocket.lease.Lease;
2426
import io.reactivesocket.lease.LeaseImpl;
2527
import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber;
2628
import io.reactivesocket.reactivestreams.extensions.Px;
29+
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
2730
import io.reactivesocket.reactivestreams.extensions.internal.processors.ConnectableUnicastProcessor;
2831
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber;
2932
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
@@ -50,8 +53,8 @@ public class ClientReactiveSocket implements ReactiveSocket {
5053
private final StreamIdSupplier streamIdSupplier;
5154
private final KeepAliveProvider keepAliveProvider;
5255

53-
private final Int2ObjectHashMap<Processor<Frame, Frame>> senders;
54-
private final Int2ObjectHashMap<Subscriber<? super Payload>> receivers;
56+
private final Int2ObjectHashMap<Subscription> senders;
57+
private final Int2ObjectHashMap<Subscriber<Frame>> receivers;
5558

5659
private volatile Subscription transportReceiveSubscription;
5760
private CancellableSubscriber<Void> keepAliveSendSub;
@@ -81,86 +84,37 @@ public Publisher<Void> fireAndForget(Payload payload) {
8184
}
8285
}
8386

87+
@Override
8488
public Publisher<Payload> requestResponse(Payload payload) {
8589
final int streamId = nextStreamId();
8690
final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1);
8791

88-
return doSendReceive(Px.just(requestFrame), streamId, 1, false);
92+
return handleRequestResponse(Px.just(requestFrame), streamId, 1, false);
8993
}
9094

9195
@Override
9296
public Publisher<Payload> requestStream(Payload payload) {
9397
final int streamId = nextStreamId();
9498
final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_STREAM, payload, 1);
9599

96-
return doSendReceive(Px.just(requestFrame), streamId, 1, true);
100+
return handleStreamResponse(Px.just(requestFrame), streamId);
97101
}
98102

99103
@Override
100104
public Publisher<Payload> requestSubscription(Payload payload) {
101105
final int streamId = nextStreamId();
102106
final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_SUBSCRIPTION, payload, 1);
103107

104-
return doSendReceive(Px.just(requestFrame), streamId, 1, true);
108+
return handleStreamResponse(Px.just(requestFrame), streamId);
105109
}
106110

107111
@Override
108112
public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
109113
final int streamId = nextStreamId();
110-
Px<Frame> frames = Px
111-
.from(payloads)
112-
.map(payload -> Frame.Request.from(streamId, FrameType.REQUEST_CHANNEL, payload, 1));
113-
return doSendReceive(frames, streamId, 1, true);
114-
}
115-
116-
private Publisher<Payload> doSendReceive(final Publisher<Frame> payload, final int streamId, final int initialRequestN, final boolean sendRequestN) {
117-
ConnectableUnicastProcessor<Frame> sender = new ConnectableUnicastProcessor<>();
118-
119-
synchronized (this) {
120-
senders.put(streamId, sender);
121-
}
122-
123-
final Runnable cleanup = () -> {
124-
synchronized (this) {
125-
receivers.remove(streamId);
126-
senders.remove(streamId);
127-
}
128-
};
129-
130-
return Px
131-
.<Payload>create(subscriber -> {
132-
synchronized (this) {
133-
receivers.put(streamId, subscriber);
134-
}
135-
136-
payload.subscribe(sender);
137-
138-
subscriber.onSubscribe(new Subscription() {
139-
140-
@Override
141-
public void request(long n) {
142-
if (sendRequestN) {
143-
sender.onNext(Frame.RequestN.from(streamId, n));
144-
}
145-
}
146-
147-
@Override
148-
public void cancel() {
149-
sender.onNext(Frame.Cancel.from(streamId));
150-
sender.cancel();
151-
}
152-
});
153-
154-
try {
155-
Px.from(connection.send(sender))
156-
.doOnError(th -> subscriber.onError(th))
157-
.subscribe(DefaultSubscriber.defaultInstance());
158-
} catch (Throwable t) {
159-
subscriber.onError(t);
160-
}
161-
})
162-
.doOnRequest(subscription -> sender.start(initialRequestN))
163-
.doOnTerminate(cleanup);
114+
return handleStreamResponse(Px.from(payloads)
115+
.map(payload -> {
116+
return Frame.Request.from(streamId, FrameType.REQUEST_CHANNEL, payload, 1);
117+
}), streamId);
164118
}
165119

166120
@Override
@@ -194,6 +148,79 @@ public ClientReactiveSocket start(Consumer<Lease> leaseConsumer) {
194148
return this;
195149
}
196150

151+
private Publisher<Payload> handleRequestResponse(final Publisher<Frame> payload, final int streamId,
152+
final int initialRequestN, final boolean sendRequestN) {
153+
ConnectableUnicastProcessor<Frame> sender = new ConnectableUnicastProcessor<>();
154+
155+
synchronized (this) {
156+
senders.put(streamId, sender);
157+
}
158+
159+
final Runnable cleanup = () -> {
160+
synchronized (this) {
161+
receivers.remove(streamId);
162+
senders.remove(streamId);
163+
}
164+
};
165+
166+
return Px
167+
.<Payload>create(subscriber -> {
168+
@SuppressWarnings("rawtypes")
169+
Subscriber raw = subscriber;
170+
@SuppressWarnings("unchecked")
171+
Subscriber<Frame> fs = raw;
172+
synchronized (this) {
173+
receivers.put(streamId, fs);
174+
}
175+
176+
payload.subscribe(sender);
177+
178+
subscriber.onSubscribe(new Subscription() {
179+
180+
@Override
181+
public void request(long n) {
182+
if (sendRequestN) {
183+
sender.onNext(Frame.RequestN.from(streamId, n));
184+
}
185+
}
186+
187+
@Override
188+
public void cancel() {
189+
sender.onNext(Frame.Cancel.from(streamId));
190+
sender.cancel();
191+
}
192+
});
193+
194+
Px.from(connection.send(sender))
195+
.doOnError(th -> subscriber.onError(th))
196+
.subscribe(DefaultSubscriber.defaultInstance());
197+
198+
})
199+
.doOnRequest(subscription -> sender.start(initialRequestN))
200+
.doOnTerminate(cleanup);
201+
}
202+
203+
private Publisher<Payload> handleStreamResponse(Publisher<Frame> request, final int streamId) {
204+
RemoteSender sender = new RemoteSender(request, () -> senders.remove(streamId), streamId, 1);
205+
Publisher<Frame> src = s -> {
206+
CancellableSubscriber<Void> sendSub = doOnError(throwable -> {
207+
s.onError(throwable);
208+
});
209+
ValidatingSubscription<? super Frame> sub = ValidatingSubscription.create(s, () -> {
210+
sendSub.cancel();
211+
}, requestN -> {
212+
transportReceiveSubscription.request(requestN);
213+
});
214+
connection.send(sender).subscribe(sendSub);
215+
s.onSubscribe(sub);
216+
};
217+
218+
RemoteReceiver receiver = new RemoteReceiver(src, connection, streamId, () -> receivers.remove(streamId), true);
219+
senders.put(streamId, sender);
220+
receivers.put(streamId, receiver);
221+
return receiver;
222+
}
223+
197224
private void startKeepAlive() {
198225
keepAliveSendSub = doOnError(errorConsumer);
199226
connection.send(Px.from(keepAliveProvider.ticks())
@@ -254,7 +281,7 @@ private void handleStreamZero(FrameType type, Frame frame) {
254281

255282
@SuppressWarnings("unchecked")
256283
private void handleFrame(int streamId, FrameType type, Frame frame) {
257-
Subscriber<? super Payload> receiver;
284+
Subscriber<Frame> receiver;
258285
synchronized (this) {
259286
receiver = receivers.get(streamId);
260287
}
@@ -270,13 +297,13 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
270297
receiver.onComplete();
271298
break;
272299
case CANCEL: {
273-
Processor sender;
274-
synchronized (ClientReactiveSocket.this) {
300+
Subscription sender;
301+
synchronized (this) {
275302
sender = senders.remove(streamId);
276303
receivers.remove(streamId);
277304
}
278305
if (sender != null) {
279-
((ConnectableUnicastProcessor) sender).cancel();
306+
sender.cancel();
280307
}
281308
receiver.onError(new CancelException("cancelling stream id " + streamId));
282309
break;
@@ -285,13 +312,13 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
285312
receiver.onNext(frame);
286313
break;
287314
case REQUEST_N: {
288-
Processor sender;
289-
synchronized (ClientReactiveSocket.this) {
315+
Subscription sender;
316+
synchronized (this) {
290317
sender = senders.get(streamId);
291318
}
292319
if (sender != null) {
293320
int n = Frame.RequestN.requestN(frame);
294-
((ConnectableUnicastProcessor) sender).requestMore(n);
321+
sender.request(n);
295322
}
296323
break;
297324
}

reactivesocket-core/src/main/java/io/reactivesocket/Frame.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
* This provides encoding, decoding and field accessors.
4040
*/
4141
public class Frame implements Payload {
42+
43+
private static final Logger logger = LoggerFactory.getLogger(Frame.class);
44+
4245
public static final ByteBuffer NULL_BYTEBUFFER = FrameHeaderFlyweight.NULL_BYTEBUFFER;
4346
public static final int DATA_MTU = 32 * 1024;
4447
public static final int METADATA_MTU = 32 * 1024;
@@ -55,11 +58,11 @@ public class Frame implements Payload {
5558
FramePool tmpPool;
5659

5760
try {
58-
System.out.println("Creating thread pooled named " + FRAME_POOLER_CLASS_NAME);
61+
logger.info("Creating thread pooled named " + FRAME_POOLER_CLASS_NAME);
5962
tmpPool = (FramePool)Class.forName(FRAME_POOLER_CLASS_NAME).newInstance();
6063
}
6164
catch (final Exception ex) {
62-
ex.printStackTrace();
65+
logger.error("Error initializing frame pool.", ex);
6366
tmpPool = new UnpooledFrame();
6467
}
6568

@@ -299,7 +302,7 @@ public static String dataMimeType(final Frame frame) {
299302
}
300303

301304
public static class Error {
302-
private static final Logger logger = LoggerFactory.getLogger(Error.class);
305+
private static final Logger errorLogger = LoggerFactory.getLogger(Error.class);
303306

304307
private Error() {}
305308

@@ -313,8 +316,8 @@ public static Frame from(
313316
final Frame frame = POOL.acquireFrame(
314317
ErrorFrameFlyweight.computeFrameLength(metadata.remaining(), data.remaining()));
315318

316-
if (logger.isDebugEnabled()) {
317-
logger.debug("an error occurred, creating error frame", throwable);
319+
if (errorLogger.isDebugEnabled()) {
320+
errorLogger.debug("an error occurred, creating error frame", throwable);
318321
}
319322

320323
frame.length = ErrorFrameFlyweight.encode(

reactivesocket-core/src/main/java/io/reactivesocket/ServerReactiveSocket.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,11 @@ private Publisher<Void> handleFrame(Frame frame) {
175175
case REQUEST_N:
176176
return handleRequestN(streamId, frame);
177177
case REQUEST_STREAM:
178-
return handleReceive(streamId, requestStream(frame));
178+
return doReceive(streamId, requestStream(frame));
179179
case FIRE_AND_FORGET:
180180
return handleFireAndForget(streamId, fireAndForget(frame));
181181
case REQUEST_SUBSCRIPTION:
182-
return handleReceive(streamId, requestSubscription(frame));
182+
return doReceive(streamId, requestSubscription(frame));
183183
case REQUEST_CHANNEL:
184184
return handleChannel(streamId, frame);
185185
case RESPONSE:
@@ -288,6 +288,14 @@ private Publisher<Void> handleReceive(int streamId, Publisher<Payload> response)
288288

289289
}
290290

291+
private Publisher<Void> doReceive(int streamId, Publisher<Payload> response) {
292+
Px<Frame> resp = Px.from(response)
293+
.map(payload -> Response.from(streamId, FrameType.RESPONSE, payload));
294+
RemoteSender sender = new RemoteSender(resp, () -> subscriptions.remove(streamId), streamId, 2);
295+
subscriptions.put(streamId, sender);
296+
return connection.send(sender);
297+
}
298+
291299
private Publisher<Void> handleChannel(int streamId, Frame firstFrame) {
292300
int initialRequestN = Request.initialRequestN(firstFrame);
293301
Frame firstAsNext = Request.from(streamId, FrameType.NEXT, firstFrame, initialRequestN);

reactivesocket-core/src/main/java/io/reactivesocket/internal/RemoteReceiver.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
*/
5454
public final class RemoteReceiver implements Processor<Frame, Payload> {
5555

56+
private final Publisher<Frame> transportSource;
5657
private final DuplexConnection connection;
5758
private final int streamId;
5859
private final Runnable cleanup;
@@ -64,10 +65,22 @@ public final class RemoteReceiver implements Processor<Frame, Payload> {
6465
private volatile boolean missedComplete;
6566
private volatile Throwable missedError;
6667

68+
public RemoteReceiver(Publisher<Frame> transportSource, DuplexConnection connection, int streamId,
69+
Runnable cleanup, boolean sendRequestN) {
70+
this.transportSource = transportSource;
71+
this.connection = connection;
72+
this.streamId = streamId;
73+
this.cleanup = cleanup;
74+
this.sendRequestN = sendRequestN;
75+
requestFrame = null;
76+
transportSubscription = null;
77+
}
78+
6779
public RemoteReceiver(DuplexConnection connection, int streamId, Runnable cleanup, Frame requestFrame,
6880
Subscription transportSubscription, boolean sendRequestN) {
6981
this.requestFrame = requestFrame;
7082
this.transportSubscription = transportSubscription;
83+
transportSource = null;
7184
this.connection = connection;
7285
this.streamId = streamId;
7386
this.cleanup = cleanup;
@@ -109,7 +122,9 @@ public void subscribe(Subscriber<? super Payload> s) {
109122
return;
110123
}
111124

112-
if (transportSubscription != null) {
125+
if (transportSource != null) {
126+
transportSource.subscribe(this);
127+
} else if (transportSubscription != null) {
113128
onSubscribe(transportSubscription);
114129
onNext(requestFrame);
115130
}

reactivesocket-examples/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,6 @@ dependencies {
4545
compile 'org.slf4j:slf4j-log4j12:1.7.21'
4646

4747
jmh group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.15'
48+
49+
compile 'org.slf4j:slf4j-log4j12:1.7.21'
4850
}

0 commit comments

Comments
 (0)