Skip to content

Commit 9b1b925

Browse files
committed
reduces maintenance complexity
this PR 1. removes set of unused classes 2. eliminate usage of custom publishers implementation if favor of better maintainability but sacrificing performance (1.1 will revise that) Signed-off-by: Oleh Dokuka <[email protected]>
1 parent d3dc85f commit 9b1b925

21 files changed

+122
-4688
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 122 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,18 @@
4545
import io.rsocket.frame.decoder.PayloadDecoder;
4646
import io.rsocket.internal.SynchronizedIntObjectHashMap;
4747
import io.rsocket.internal.UnboundedProcessor;
48-
import io.rsocket.internal.UnicastMonoEmpty;
49-
import io.rsocket.internal.UnicastMonoProcessor;
5048
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
5149
import io.rsocket.keepalive.KeepAliveHandler;
5250
import io.rsocket.keepalive.KeepAliveSupport;
5351
import io.rsocket.lease.RequesterLeaseHandler;
54-
import io.rsocket.util.MonoLifecycleHandler;
5552
import java.nio.channels.ClosedChannelException;
5653
import java.util.concurrent.CancellationException;
54+
import java.util.concurrent.atomic.AtomicBoolean;
5755
import java.util.concurrent.atomic.AtomicInteger;
5856
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
5957
import java.util.function.Consumer;
6058
import java.util.function.LongConsumer;
6159
import java.util.function.Supplier;
62-
import javax.annotation.Nonnull;
6360
import javax.annotation.Nullable;
6461
import org.reactivestreams.Processor;
6562
import org.reactivestreams.Publisher;
@@ -210,15 +207,25 @@ private Mono<Void> handleFireAndForget(Payload payload) {
210207
return Mono.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
211208
}
212209

210+
final AtomicBoolean once = new AtomicBoolean();
213211
final int streamId = streamIdSupplier.nextStreamId(receivers);
214212

215-
return UnicastMonoEmpty.newInstance(
213+
return Mono.defer(
216214
() -> {
217-
ByteBuf requestFrame =
218-
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
219-
allocator, streamId, payload);
215+
if (once.getAndSet(true)) {
216+
return Mono.error(
217+
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
218+
}
219+
220+
return Mono.<Void>empty()
221+
.doOnSubscribe(
222+
(__) -> {
223+
ByteBuf requestFrame =
224+
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
225+
allocator, streamId, payload);
220226

221-
sendProcessor.onNext(requestFrame);
227+
sendProcessor.onNext(requestFrame);
228+
});
222229
});
223230
}
224231

@@ -236,34 +243,37 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
236243

237244
int streamId = streamIdSupplier.nextStreamId(receivers);
238245
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
239-
240-
UnicastMonoProcessor<Payload> receiver =
241-
UnicastMonoProcessor.create(
242-
new MonoLifecycleHandler<Payload>() {
243-
@Override
244-
public void doOnSubscribe() {
245-
final ByteBuf requestFrame =
246-
RequestResponseFrameFlyweight.encodeReleasingPayload(
247-
allocator, streamId, payload);
248-
249-
sendProcessor.onNext(requestFrame);
250-
}
251-
252-
@Override
253-
public void doOnTerminal(
254-
@Nonnull SignalType signalType,
255-
@Nullable Payload element,
256-
@Nullable Throwable e) {
257-
if (signalType == SignalType.CANCEL) {
258-
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
259-
}
260-
removeStreamReceiver(streamId);
261-
}
262-
});
246+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());
247+
final AtomicBoolean once = new AtomicBoolean();
263248

264249
receivers.put(streamId, receiver);
265250

266-
return receiver.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
251+
return Mono.defer(
252+
() -> {
253+
if (once.getAndSet(true)) {
254+
return Mono.error(
255+
new IllegalStateException("RequestResponseMono allows only a single subscriber"));
256+
}
257+
258+
return receiver
259+
.doOnSubscribe(
260+
(__) -> {
261+
ByteBuf requestFrame =
262+
RequestResponseFrameFlyweight.encodeReleasingPayload(
263+
allocator, streamId, payload);
264+
265+
sendProcessor.onNext(requestFrame);
266+
})
267+
.doFinally(
268+
signalType -> {
269+
if (signalType == SignalType.CANCEL) {
270+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
271+
}
272+
removeStreamReceiver(streamId);
273+
})
274+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
275+
.as(Mono::fromDirect);
276+
});
267277
}
268278

269279
private Flux<Payload> handleRequestStream(final Payload payload) {
@@ -283,65 +293,76 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
283293
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
284294
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
285295
final AtomicInteger wip = new AtomicInteger(0);
296+
final AtomicBoolean once = new AtomicBoolean();
286297

287298
receivers.put(streamId, receiver);
288299

289-
return receiver
290-
.doOnRequest(
291-
new LongConsumer() {
292-
293-
boolean firstRequest = true;
300+
return Flux.defer(
301+
() -> {
302+
if (once.getAndSet(true)) {
303+
return Flux.error(
304+
new IllegalStateException("RequestStreamFlux allows only a single subscriber"));
305+
}
294306

295-
@Override
296-
public void accept(long n) {
297-
if (firstRequest) {
298-
firstRequest = false;
299-
if (wip.getAndIncrement() != 0) {
300-
// no need to do anything.
301-
// stream was canceled and fist payload has already been discarded
302-
return;
303-
}
304-
int missed = 1;
305-
boolean firstHasBeenSent = false;
306-
for (; ; ) {
307-
if (!firstHasBeenSent) {
308-
sendProcessor.onNext(
309-
RequestStreamFrameFlyweight.encodeReleasingPayload(
310-
allocator, streamId, n, payload));
311-
firstHasBeenSent = true;
312-
} else {
313-
// if first frame was sent but we cycling again, it means that wip was
314-
// incremented at doOnCancel
315-
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
316-
return;
307+
return receiver
308+
.doOnRequest(
309+
new LongConsumer() {
310+
311+
boolean firstRequest = true;
312+
313+
@Override
314+
public void accept(long n) {
315+
if (firstRequest) {
316+
firstRequest = false;
317+
if (wip.getAndIncrement() != 0) {
318+
// no need to do anything.
319+
// stream was canceled and fist payload has already been discarded
320+
return;
321+
}
322+
int missed = 1;
323+
boolean firstHasBeenSent = false;
324+
for (; ; ) {
325+
if (!firstHasBeenSent) {
326+
sendProcessor.onNext(
327+
RequestStreamFrameFlyweight.encodeReleasingPayload(
328+
allocator, streamId, n, payload));
329+
firstHasBeenSent = true;
330+
} else {
331+
// if first frame was sent but we cycling again, it means that wip was
332+
// incremented at doOnCancel
333+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
334+
return;
335+
}
336+
337+
missed = wip.addAndGet(-missed);
338+
if (missed == 0) {
339+
return;
340+
}
341+
}
342+
} else if (!receiver.isDisposed()) {
343+
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
344+
}
317345
}
346+
})
347+
.doFinally(
348+
s -> {
349+
if (s == SignalType.CANCEL) {
350+
if (wip.getAndIncrement() != 0) {
351+
return;
352+
}
318353

319-
missed = wip.addAndGet(-missed);
320-
if (missed == 0) {
321-
return;
354+
// check if we need to release payload
355+
// only applicable if the cancel appears earlier than actual request
356+
if (payload.refCnt() > 0) {
357+
payload.release();
358+
} else {
359+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
360+
}
322361
}
323-
}
324-
} else if (!receiver.isDisposed()) {
325-
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
326-
}
327-
}
328-
})
329-
.doOnCancel(
330-
() -> {
331-
if (wip.getAndIncrement() != 0) {
332-
return;
333-
}
334-
335-
// check if we need to release payload
336-
// only applicable if the cancel appears earlier than actual request
337-
if (payload.refCnt() > 0) {
338-
payload.release();
339-
} else {
340-
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
341-
}
342-
})
343-
.doFinally(s -> removeStreamReceiver(streamId))
344-
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
362+
removeStreamReceiver(streamId);
363+
})
364+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
365+
});
345366
}
346367

347368
private Flux<Payload> handleChannel(Flux<Payload> request) {
@@ -522,12 +543,23 @@ private Mono<Void> handleMetadataPush(Payload payload) {
522543
return Mono.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
523544
}
524545

525-
return UnicastMonoEmpty.newInstance(
546+
final AtomicBoolean once = new AtomicBoolean();
547+
548+
return Mono.defer(
526549
() -> {
527-
ByteBuf metadataPushFrame =
528-
MetadataPushFrameFlyweight.encodeReleasingPayload(allocator, payload);
550+
if (once.getAndSet(true)) {
551+
return Mono.error(
552+
new IllegalStateException("MetadataPushMono allows only a single subscriber"));
553+
}
554+
555+
return Mono.<Void>empty()
556+
.doOnSubscribe(
557+
(__) -> {
558+
ByteBuf metadataPushFrame =
559+
MetadataPushFrameFlyweight.encodeReleasingPayload(allocator, payload);
529560

530-
sendProcessor.onNextPrioritized(metadataPushFrame);
561+
sendProcessor.onNextPrioritized(metadataPushFrame);
562+
});
531563
});
532564
}
533565

@@ -544,10 +576,6 @@ private Throwable checkAvailable() {
544576
return null;
545577
}
546578

547-
private boolean contains(int streamId) {
548-
return receivers.containsKey(streamId);
549-
}
550-
551579
private void handleIncomingFrames(ByteBuf frame) {
552580
try {
553581
int streamId = FrameHeaderFlyweight.streamId(frame);

0 commit comments

Comments
 (0)