Skip to content

Commit 91a24b7

Browse files
committed
Make ReactiveSocket requests lazy
__Problem__ Today a client can not retry a request by simply adding a retry operator. eg: ```java Flowable.fromPublisher(socket.requestResponse(PayloadImpl.EMPTY)).retry() ``` does not work. Instead one has to use ```java Flowable.fromPublisher(Flowable.defer(() -> socket.requestResponse(PayloadImpl.EMPTY))) ``` The reason is that the implementation creates a `streamId` upon calling `requestResponse` and not on each subscription. __Modification__ Modify `ClientReactiveSocket` to create a new `streamId` per subscription. This will make sure that each subscription is unique and hence a server will not have an issue accepting such requests. __Result__ More idiomatic usage of function composition and retry/repeat kind of models.
1 parent b8fd58a commit 91a24b7

File tree

3 files changed

+107
-105
lines changed

3 files changed

+107
-105
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java

Lines changed: 82 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,9 @@
2727
import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber;
2828
import io.reactivesocket.reactivestreams.extensions.Px;
2929
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
30-
import io.reactivesocket.reactivestreams.extensions.internal.processors.ConnectableUnicastProcessor;
3130
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber;
3231
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
3332
import org.agrona.collections.Int2ObjectHashMap;
34-
import org.reactivestreams.Processor;
3533
import org.reactivestreams.Publisher;
3634
import org.reactivestreams.Subscriber;
3735
import org.reactivestreams.Subscription;
@@ -40,7 +38,7 @@
4038
import java.nio.charset.StandardCharsets;
4139
import java.util.function.Consumer;
4240

43-
import static io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers.doOnError;
41+
import static io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers.*;
4442

4543
/**
4644
* Client Side of a ReactiveSocket socket. Sends {@link Frame}s
@@ -75,46 +73,31 @@ public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> err
7573

7674
@Override
7775
public Publisher<Void> fireAndForget(Payload payload) {
78-
try {
76+
return Px.defer(() -> {
7977
final int streamId = nextStreamId();
8078
final Frame requestFrame = Frame.Request.from(streamId, FrameType.FIRE_AND_FORGET, payload, 0);
8179
return connection.sendOne(requestFrame);
82-
} catch (Throwable t) {
83-
return Px.error(t);
84-
}
80+
});
8581
}
8682

8783
@Override
8884
public Publisher<Payload> requestResponse(Payload payload) {
89-
final int streamId = nextStreamId();
90-
final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1);
91-
92-
return handleRequestResponse(Px.just(requestFrame), streamId, 1, false);
85+
return handleRequestResponse(payload);
9386
}
9487

9588
@Override
9689
public Publisher<Payload> requestStream(Payload payload) {
97-
final int streamId = nextStreamId();
98-
final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_STREAM, payload, 1);
99-
100-
return handleStreamResponse(Px.just(requestFrame), streamId);
90+
return handleStreamResponse(Px.just(payload), FrameType.REQUEST_STREAM);
10191
}
10292

10393
@Override
10494
public Publisher<Payload> requestSubscription(Payload payload) {
105-
final int streamId = nextStreamId();
106-
final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_SUBSCRIPTION, payload, 1);
107-
108-
return handleStreamResponse(Px.just(requestFrame), streamId);
95+
return handleStreamResponse(Px.just(payload), FrameType.REQUEST_SUBSCRIPTION);
10996
}
11097

11198
@Override
11299
public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
113-
final int streamId = nextStreamId();
114-
return handleStreamResponse(Px.from(payloads)
115-
.map(payload -> {
116-
return Frame.Request.from(streamId, FrameType.REQUEST_CHANNEL, payload, 1);
117-
}), streamId);
100+
return handleStreamResponse(Px.from(payloads), FrameType.REQUEST_CHANNEL);
118101
}
119102

120103
@Override
@@ -148,77 +131,53 @@ public ClientReactiveSocket start(Consumer<Lease> leaseConsumer) {
148131
return this;
149132
}
150133

151-
private Publisher<Payload> handleRequestResponse(final Publisher<Frame> payload, final int streamId,
152-
final int initialRequestN, final boolean sendRequestN) {
153-
ConnectableUnicastProcessor<Frame> sender = new ConnectableUnicastProcessor<>();
154-
155-
synchronized (this) {
156-
senders.put(streamId, sender);
157-
}
158-
159-
final Runnable cleanup = () -> {
134+
private Publisher<Payload> handleRequestResponse(final Payload payload) {
135+
return Px.create(subscriber -> {
136+
int streamId = nextStreamId();
137+
final Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1);
160138
synchronized (this) {
161-
receivers.remove(streamId);
162-
senders.remove(streamId);
139+
@SuppressWarnings("rawtypes")
140+
Subscriber raw = subscriber;
141+
@SuppressWarnings("unchecked")
142+
Subscriber<Frame> fs = raw;
143+
receivers.put(streamId, fs);
163144
}
164-
};
165-
166-
return Px
167-
.<Payload>create(subscriber -> {
168-
@SuppressWarnings("rawtypes")
169-
Subscriber raw = subscriber;
170-
@SuppressWarnings("unchecked")
171-
Subscriber<Frame> fs = raw;
172-
synchronized (this) {
173-
receivers.put(streamId, fs);
174-
}
175-
176-
payload.subscribe(sender);
177-
178-
subscriber.onSubscribe(new Subscription() {
179-
180-
@Override
181-
public void request(long n) {
182-
if (sendRequestN) {
183-
sender.onNext(Frame.RequestN.from(streamId, n));
184-
}
185-
}
186-
187-
@Override
188-
public void cancel() {
189-
sender.onNext(Frame.Cancel.from(streamId));
190-
sender.cancel();
191-
}
192-
});
193-
194-
Px.from(connection.send(sender))
195-
.doOnError(th -> subscriber.onError(th))
196-
.subscribe(DefaultSubscriber.defaultInstance());
197-
198-
})
199-
.doOnRequest(subscription -> sender.start(initialRequestN))
200-
.doOnTerminate(cleanup);
145+
Px.concatEmpty(connection.sendOne(requestFrame), Px.never())
146+
.cast(Payload.class)
147+
.doOnCancel(() -> {
148+
if (connection.availability() > 0.0) {
149+
connection.sendOne(Frame.Cancel.from(streamId))
150+
.subscribe(DefaultSubscriber.defaultInstance());
151+
}
152+
})
153+
.subscribe(subscriber);
154+
});
201155
}
202156

203-
private Publisher<Payload> handleStreamResponse(Publisher<Frame> request, final int streamId) {
204-
RemoteSender sender = new RemoteSender(request, () -> senders.remove(streamId), streamId, 1);
205-
Publisher<Frame> src = s -> {
206-
CancellableSubscriber<Void> sendSub = doOnError(throwable -> {
207-
s.onError(throwable);
208-
});
209-
ValidatingSubscription<? super Frame> sub = ValidatingSubscription.create(s, () -> {
210-
sendSub.cancel();
211-
}, requestN -> {
212-
transportReceiveSubscription.request(requestN);
213-
});
214-
connection.send(sender).subscribe(sendSub);
215-
s.onSubscribe(sub);
216-
};
217-
218-
RemoteReceiver receiver = new RemoteReceiver(src, connection, streamId, () -> receivers.remove(streamId), true);
219-
senders.put(streamId, sender);
220-
receivers.put(streamId, receiver);
221-
return receiver;
157+
private Publisher<Payload> handleStreamResponse(Px<Payload> request, FrameType requestType) {
158+
return Px.defer(() -> {
159+
int streamId = nextStreamId();
160+
RemoteSender sender = new RemoteSender(request.map(payload -> Frame.Request.from(streamId, requestType,
161+
payload, 1)),
162+
removeSenderLambda(streamId), 1);
163+
Publisher<Frame> src = s -> {
164+
CancellableSubscriber<Void> sendSub = doOnError(throwable -> {
165+
s.onError(throwable);
166+
});
167+
ValidatingSubscription<? super Frame> sub = ValidatingSubscription.create(s, () -> {
168+
sendSub.cancel();
169+
}, requestN -> {
170+
transportReceiveSubscription.request(requestN);
171+
});
172+
connection.send(sender).subscribe(sendSub);
173+
s.onSubscribe(sub);
174+
};
175+
176+
RemoteReceiver receiver = new RemoteReceiver(src, connection, streamId, removeReceiverLambda(streamId),
177+
true);
178+
registerSenderReceiver(streamId, sender, receiver);
179+
return receiver;
180+
});
222181
}
223182

224183
private void startKeepAlive() {
@@ -291,10 +250,16 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
291250
switch (type) {
292251
case ERROR:
293252
receiver.onError(Exceptions.from(frame));
253+
synchronized (this) {
254+
receivers.remove(streamId);
255+
}
294256
break;
295257
case NEXT_COMPLETE:
296258
receiver.onNext(frame);
297259
receiver.onComplete();
260+
synchronized (this) {
261+
receivers.remove(streamId);
262+
}
298263
break;
299264
case CANCEL: {
300265
Subscription sender;
@@ -324,6 +289,9 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
324289
}
325290
case COMPLETE:
326291
receiver.onComplete();
292+
synchronized (this) {
293+
receivers.remove(streamId);
294+
}
327295
break;
328296
default:
329297
throw new IllegalStateException(
@@ -360,5 +328,28 @@ private static String getByteBufferAsString(ByteBuffer bb) {
360328
return new String(bytes, StandardCharsets.UTF_8);
361329
}
362330

331+
private Runnable removeReceiverLambda(int streamId) {
332+
return () -> {
333+
removeReceiver(streamId);
334+
};
335+
}
363336

337+
private synchronized void removeReceiver(int streamId) {
338+
receivers.remove(streamId);
339+
}
340+
341+
private Runnable removeSenderLambda(int streamId) {
342+
return () -> {
343+
removeSender(streamId);
344+
};
345+
}
346+
347+
private synchronized void removeSender(int streamId) {
348+
senders.remove(streamId);
349+
}
350+
351+
private synchronized void registerSenderReceiver(int streamId, Subscription sender, Subscriber<Frame> receiver) {
352+
senders.put(streamId, sender);
353+
receivers.put(streamId, receiver);
354+
}
364355
}

reactivesocket-core/src/test/java/io/reactivesocket/ClientReactiveSocketTest.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,9 @@
3030
import java.util.ArrayList;
3131
import java.util.List;
3232

33-
import static io.reactivesocket.FrameType.CANCEL;
34-
import static io.reactivesocket.FrameType.KEEPALIVE;
35-
import static io.reactivesocket.FrameType.NEXT_COMPLETE;
36-
import static io.reactivesocket.FrameType.REQUEST_RESPONSE;
33+
import static io.reactivesocket.FrameType.*;
3734
import static org.hamcrest.MatcherAssert.assertThat;
38-
import static org.hamcrest.Matchers.contains;
39-
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
40-
import static org.hamcrest.Matchers.hasSize;
41-
import static org.hamcrest.Matchers.instanceOf;
42-
import static org.hamcrest.Matchers.is;
35+
import static org.hamcrest.Matchers.*;
4336

4437
public class ClientReactiveSocketTest {
4538

@@ -64,7 +57,6 @@ public void testHandleSetupException() throws Throwable {
6457
rule.connection.addToReceivedBuffer(Frame.Error.from(0, new RejectedSetupException("boom")));
6558
}
6659

67-
6860
@Test(timeout = 2_000)
6961
public void testHandleApplicationException() throws Throwable {
7062
rule.connection.clearSendReceiveBuffers();
@@ -91,7 +83,7 @@ public void testHandleValidFrame() throws Throwable {
9183
responseSub.assertComplete();
9284
}
9385

94-
@Test(timeout = 2_000)
86+
@Test
9587
public void testRequestReplyWithCancel() throws Throwable {
9688
rule.connection.clearSendReceiveBuffers(); // clear setup frame
9789
Publisher<Payload> response = rule.socket.requestResponse(PayloadImpl.EMPTY);
@@ -102,6 +94,7 @@ public void testRequestReplyWithCancel() throws Throwable {
10294
responseSub.assertValueCount(0);
10395
responseSub.assertNotTerminated();
10496

97+
assertThat("Unexpected frame sent on the connection.", rule.connection.awaitSend().getType(), is(REQUEST_RESPONSE));
10598
assertThat("Unexpected frame sent on the connection.", rule.connection.awaitSend().getType(), is(CANCEL));
10699
}
107100

@@ -116,9 +109,27 @@ public void testRequestReplyErrorOnSend() throws Throwable {
116109
responseSub.assertError(RuntimeException.class);
117110
}
118111

112+
@Test
113+
public void testLazyRequestResponse() throws Exception {
114+
Publisher<Payload> response = rule.socket.requestResponse(PayloadImpl.EMPTY);
115+
int streamId = sendRequestResponse(response);
116+
rule.connection.clearSendReceiveBuffers();
117+
int streamId2 = sendRequestResponse(response);
118+
assertThat("Stream ID reused.", streamId2, not(equalTo(streamId)));
119+
}
120+
121+
public int sendRequestResponse(Publisher<Payload> response) {
122+
TestSubscriber<Payload> sub = TestSubscriber.create();
123+
response.subscribe(sub);
124+
int streamId = rule.getStreamIdForRequestType(REQUEST_RESPONSE);
125+
rule.connection.addToReceivedBuffer(Frame.Response.from(streamId, RESPONSE, PayloadImpl.EMPTY));
126+
sub.assertValueCount(1).assertNoErrors();
127+
return streamId;
128+
}
129+
119130
public static class ClientSocketRule extends AbstractSocketRule<ClientReactiveSocket> {
120131

121-
private PublishProcessor<Long> keepAliveTicks = PublishProcessor.create();
132+
private final PublishProcessor<Long> keepAliveTicks = PublishProcessor.create();
122133

123134
@Override
124135
protected ClientReactiveSocket newReactiveSocket() {

reactivesocket-examples/src/main/java/io/reactivesocket/examples/transport/tcp/requestresponse/HelloWorldClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ public Publisher<Payload> requestResponse(Payload p) {
5555
.connect())
5656
.blockingFirst();
5757

58-
Flowable.fromPublisher(socket.requestResponse(PayloadImpl.EMPTY))
58+
Flowable.fromPublisher(socket.requestResponse(new PayloadImpl("Hello")))
5959
.map(payload -> payload.getData())
6060
.map(ByteBufferUtil::toUtf8String)
61-
.doOnNext(x -> System.out.println("===>>>> " + x))
61+
.doOnNext(System.out::println)
6262
.concatWith(Flowable.fromPublisher(socket.close()).cast(String.class))
6363
.blockingFirst();
6464
}

0 commit comments

Comments
 (0)