Skip to content

Commit 6a74390

Browse files
committed
provides ordered stream id issuing
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 12fd301 commit 6a74390

File tree

15 files changed

+340
-129
lines changed

15 files changed

+340
-129
lines changed

rsocket-core/src/main/java/io/rsocket/DuplexConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.reactivestreams.Subscriber;
2424
import reactor.core.publisher.Flux;
2525
import reactor.core.publisher.Mono;
26+
import reactor.core.scheduler.Scheduler;
2627

2728
/** Represents a connection with input/output that the protocol uses. */
2829
public interface DuplexConnection extends Availability, Closeable {
@@ -86,6 +87,14 @@ default Mono<Void> sendOne(ByteBuf frame) {
8687
*/
8788
ByteBufAllocator alloc();
8889

90+
/**
91+
* Returns associated to this connection {@link Scheduler} that will process all submitted tasks
92+
* in an ordered / serial fashion.
93+
*
94+
* @return events' ordered {@link Scheduler}
95+
*/
96+
Scheduler eventLoopScheduler();
97+
8998
@Override
9099
default double availability() {
91100
return isDisposed() ? 0.0 : 1.0;

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 125 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -208,22 +208,23 @@ private Mono<Void> handleFireAndForget(Payload payload) {
208208

209209
final AtomicBoolean once = new AtomicBoolean();
210210

211-
return Mono.defer(
212-
() -> {
213-
if (once.getAndSet(true)) {
214-
return Mono.error(
215-
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
216-
}
211+
return Mono.<Void>defer(
212+
() -> {
213+
if (once.getAndSet(true)) {
214+
return Mono.error(
215+
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
216+
}
217217

218-
final int streamId = streamIdSupplier.nextStreamId(receivers);
219-
final ByteBuf requestFrame =
220-
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
221-
allocator, streamId, payload);
218+
final int streamId = streamIdSupplier.nextStreamId(receivers);
219+
final ByteBuf requestFrame =
220+
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
221+
allocator, streamId, payload);
222222

223-
sendProcessor.onNext(requestFrame);
223+
sendProcessor.onNext(requestFrame);
224224

225-
return Mono.empty();
226-
});
225+
return Mono.empty();
226+
})
227+
.subscribeOn(connection.eventLoopScheduler());
227228
}
228229

229230
private Mono<Payload> handleRequestResponse(final Payload payload) {
@@ -284,6 +285,7 @@ public void hookOnTerminal(SignalType signalType) {
284285
receivers.remove(streamId, receiver);
285286
}
286287
}))
288+
.subscribeOn(connection.eventLoopScheduler())
287289
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
288290
});
289291
}
@@ -356,6 +358,7 @@ void hookOnTerminal(SignalType signalType) {
356358
receivers.remove(streamId);
357359
}
358360
}))
361+
.subscribeOn(connection.eventLoopScheduler(), false)
359362
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
360363
});
361364
}
@@ -392,120 +395,125 @@ private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Paylo
392395

393396
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
394397

395-
return receiver.transform(
396-
Operators.<Payload, Payload>lift(
397-
(s, actual) ->
398-
new RequestOperator(actual) {
398+
return receiver
399+
.transform(
400+
Operators.<Payload, Payload>lift(
401+
(s, actual) ->
402+
new RequestOperator(actual) {
399403

400-
final BaseSubscriber<Payload> upstreamSubscriber =
401-
new BaseSubscriber<Payload>() {
404+
final BaseSubscriber<Payload> upstreamSubscriber =
405+
new BaseSubscriber<Payload>() {
402406

403-
boolean first = true;
407+
boolean first = true;
404408

405-
@Override
406-
protected void hookOnSubscribe(Subscription subscription) {
407-
// noops
408-
}
409+
@Override
410+
protected void hookOnSubscribe(Subscription subscription) {
411+
// noops
412+
}
409413

410-
@Override
411-
protected void hookOnNext(Payload payload) {
412-
if (first) {
413-
// need to skip first since we have already sent it
414-
// no need to release it since it was released earlier on the request
415-
// establishment
416-
// phase
417-
first = false;
418-
request(1);
419-
return;
420-
}
421-
if (!PayloadValidationUtils.isValid(mtu, payload)) {
422-
payload.release();
423-
cancel();
424-
final IllegalArgumentException t =
425-
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
426-
errorConsumer.accept(t);
427-
// no need to send any errors.
428-
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
429-
receiver.onError(t);
430-
return;
431-
}
432-
final ByteBuf frame =
433-
PayloadFrameFlyweight.encodeNextReleasingPayload(
434-
allocator, streamId, payload);
435-
436-
sendProcessor.onNext(frame);
437-
}
414+
@Override
415+
protected void hookOnNext(Payload payload) {
416+
if (first) {
417+
// need to skip first since we have already sent it
418+
// no need to release it since it was released earlier on the
419+
// request
420+
// establishment
421+
// phase
422+
first = false;
423+
request(1);
424+
return;
425+
}
426+
if (!PayloadValidationUtils.isValid(mtu, payload)) {
427+
payload.release();
428+
cancel();
429+
final IllegalArgumentException t =
430+
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
431+
errorConsumer.accept(t);
432+
// no need to send any errors.
433+
sendProcessor.onNext(
434+
CancelFrameFlyweight.encode(allocator, streamId));
435+
receiver.onError(t);
436+
return;
437+
}
438+
final ByteBuf frame =
439+
PayloadFrameFlyweight.encodeNextReleasingPayload(
440+
allocator, streamId, payload);
441+
442+
sendProcessor.onNext(frame);
443+
}
444+
445+
@Override
446+
protected void hookOnComplete() {
447+
ByteBuf frame =
448+
PayloadFrameFlyweight.encodeComplete(allocator, streamId);
449+
sendProcessor.onNext(frame);
450+
}
451+
452+
@Override
453+
protected void hookOnError(Throwable t) {
454+
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
455+
sendProcessor.onNext(frame);
456+
receiver.onError(t);
457+
}
438458

439-
@Override
440-
protected void hookOnComplete() {
441-
ByteBuf frame = PayloadFrameFlyweight.encodeComplete(allocator, streamId);
442-
sendProcessor.onNext(frame);
459+
@Override
460+
protected void hookFinally(SignalType type) {
461+
senders.remove(streamId, this);
462+
}
463+
};
464+
465+
@Override
466+
void hookOnFirstRequest(long n) {
467+
final int streamId = streamIdSupplier.nextStreamId(receivers);
468+
this.streamId = streamId;
469+
470+
final ByteBuf frame =
471+
RequestChannelFrameFlyweight.encodeReleasingPayload(
472+
allocator, streamId, false, n, initialPayload);
473+
474+
senders.put(streamId, upstreamSubscriber);
475+
receivers.put(streamId, receiver);
476+
477+
inboundFlux
478+
.limitRate(Queues.SMALL_BUFFER_SIZE)
479+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
480+
.subscribe(upstreamSubscriber);
481+
482+
sendProcessor.onNext(frame);
483+
}
484+
485+
@Override
486+
void hookOnRemainingRequests(long n) {
487+
if (receiver.isDisposed()) {
488+
return;
443489
}
444490

445-
@Override
446-
protected void hookOnError(Throwable t) {
447-
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
448-
sendProcessor.onNext(frame);
449-
receiver.onError(t);
491+
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
492+
}
493+
494+
@Override
495+
void hookOnCancel() {
496+
senders.remove(streamId, upstreamSubscriber);
497+
if (receivers.remove(streamId, receiver)) {
498+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
450499
}
500+
}
451501

452-
@Override
453-
protected void hookFinally(SignalType type) {
454-
senders.remove(streamId, this);
502+
@Override
503+
void hookOnTerminal(SignalType signalType) {
504+
if (signalType == SignalType.ON_ERROR) {
505+
upstreamSubscriber.cancel();
455506
}
456-
};
457-
458-
@Override
459-
void hookOnFirstRequest(long n) {
460-
final int streamId = streamIdSupplier.nextStreamId(receivers);
461-
this.streamId = streamId;
462-
463-
final ByteBuf frame =
464-
RequestChannelFrameFlyweight.encodeReleasingPayload(
465-
allocator, streamId, false, n, initialPayload);
466-
467-
senders.put(streamId, upstreamSubscriber);
468-
receivers.put(streamId, receiver);
469-
470-
inboundFlux
471-
.limitRate(Queues.SMALL_BUFFER_SIZE)
472-
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
473-
.subscribe(upstreamSubscriber);
474-
475-
sendProcessor.onNext(frame);
476-
}
477-
478-
@Override
479-
void hookOnRemainingRequests(long n) {
480-
if (receiver.isDisposed()) {
481-
return;
482-
}
483-
484-
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
485-
}
486-
487-
@Override
488-
void hookOnCancel() {
489-
senders.remove(streamId, upstreamSubscriber);
490-
if (receivers.remove(streamId, receiver)) {
491-
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
492-
}
493-
}
494-
495-
@Override
496-
void hookOnTerminal(SignalType signalType) {
497-
if (signalType == SignalType.ON_ERROR) {
498-
upstreamSubscriber.cancel();
499-
}
500-
receivers.remove(streamId, receiver);
501-
}
502-
503-
@Override
504-
public void cancel() {
505-
upstreamSubscriber.cancel();
506-
super.cancel();
507-
}
508-
}));
507+
receivers.remove(streamId, receiver);
508+
}
509+
510+
@Override
511+
public void cancel() {
512+
upstreamSubscriber.cancel();
513+
super.cancel();
514+
}
515+
}))
516+
.subscribeOn(connection.eventLoopScheduler(), false);
509517
}
510518

511519
private Mono<Void> handleMetadataPush(Payload payload) {

rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.reactivestreams.Publisher;
2525
import reactor.core.publisher.Flux;
2626
import reactor.core.publisher.Mono;
27+
import reactor.core.scheduler.Scheduler;
2728

2829
/**
2930
* A {@link DuplexConnection} implementation that reassembles {@link ByteBuf}s.
@@ -75,6 +76,11 @@ public Flux<ByteBuf> receive() {
7576
});
7677
}
7778

79+
@Override
80+
public Scheduler eventLoopScheduler() {
81+
return delegate.eventLoopScheduler();
82+
}
83+
7884
@Override
7985
public ByteBufAllocator alloc() {
8086
return delegate.alloc();

rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import reactor.core.publisher.Flux;
3131
import reactor.core.publisher.Mono;
3232
import reactor.core.publisher.MonoProcessor;
33+
import reactor.core.scheduler.Scheduler;
3334

3435
/**
3536
* {@link DuplexConnection#receive()} is a single stream on which the following type of frames
@@ -202,6 +203,11 @@ public Flux<ByteBuf> receive() {
202203
}));
203204
}
204205

206+
@Override
207+
public Scheduler eventLoopScheduler() {
208+
return source.eventLoopScheduler();
209+
}
210+
205211
@Override
206212
public ByteBufAllocator alloc() {
207213
return source.alloc();

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import reactor.core.Disposable;
3434
import reactor.core.Disposables;
3535
import reactor.core.publisher.*;
36+
import reactor.core.scheduler.Scheduler;
3637
import reactor.util.concurrent.Queues;
3738

3839
public class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
@@ -106,6 +107,11 @@ public ResumableDuplexConnection(
106107
reconnect(duplexConnection);
107108
}
108109

110+
@Override
111+
public Scheduler eventLoopScheduler() {
112+
return curConnection.eventLoopScheduler();
113+
}
114+
109115
@Override
110116
public ByteBufAllocator alloc() {
111117
return curConnection.alloc();

0 commit comments

Comments
 (0)