Skip to content

Commit 268e1ae

Browse files
committed
more fixes
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent ea2e5f9 commit 268e1ae

File tree

12 files changed

+70
-2486
lines changed

12 files changed

+70
-2486
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import io.rsocket.frame.RequestResponseFrameFlyweight;
3838
import io.rsocket.frame.RequestStreamFrameFlyweight;
3939
import io.rsocket.frame.decoder.PayloadDecoder;
40-
import io.rsocket.internal.FluxSwitchOnFirst;
41-
import io.rsocket.internal.RateLimitableRequestSubscriber;
4240
import io.rsocket.internal.SynchronizedIntObjectHashMap;
4341
import io.rsocket.internal.UnboundedProcessor;
4442
import io.rsocket.internal.UnicastMonoEmpty;
@@ -60,6 +58,7 @@
6058
import org.reactivestreams.Publisher;
6159
import org.reactivestreams.Subscriber;
6260
import org.reactivestreams.Subscription;
61+
import reactor.core.publisher.BaseSubscriber;
6362
import reactor.core.publisher.Flux;
6463
import reactor.core.publisher.Mono;
6564
import reactor.core.publisher.SignalType;
@@ -83,7 +82,7 @@ class RSocketRequester implements RSocket {
8382
private final PayloadDecoder payloadDecoder;
8483
private final Consumer<Throwable> errorConsumer;
8584
private final StreamIdSupplier streamIdSupplier;
86-
private final IntObjectMap<RateLimitableRequestSubscriber> senders;
85+
private final IntObjectMap<Subscription> senders;
8786
private final IntObjectMap<Processor<Payload, Payload>> receivers;
8887
private final UnboundedProcessor<ByteBuf> sendProcessor;
8988
private final RequesterLeaseHandler leaseHandler;
@@ -262,7 +261,6 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
262261
receivers.put(streamId, receiver);
263262

264263
return receiver
265-
.log()
266264
.doOnRequest(
267265
new LongConsumer() {
268266

@@ -312,29 +310,31 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
312310
return Flux.error(err);
313311
}
314312

315-
return request.transform(
316-
f ->
317-
new FluxSwitchOnFirst<>(
318-
f,
319-
(s, flux) -> {
320-
Payload payload = s.get();
321-
if (payload != null) {
322-
return handleChannel(flux, payload);
323-
} else {
324-
return flux;
325-
}
326-
},
327-
false));
313+
return request.switchOnFirst(
314+
(s, flux) -> {
315+
Payload payload = s.get();
316+
if (payload != null) {
317+
return handleChannel(payload, flux.skip(1));
318+
} else {
319+
return flux;
320+
}
321+
},
322+
false);
328323
}
329324

330-
private Flux<? extends Payload> handleChannel(Flux<Payload> inboundFlux, Payload initialPayload) {
325+
private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Payload> inboundFlux) {
331326
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
332327
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);
333328
final int streamId = streamIdSupplier.nextStreamId(receivers);
334329

335330
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
336-
final RateLimitableRequestSubscriber<Payload> upstreamSubscriber =
337-
new RateLimitableRequestSubscriber<Payload>(Queues.SMALL_BUFFER_SIZE) {
331+
final BaseSubscriber<Payload> upstreamSubscriber =
332+
new BaseSubscriber<Payload>() {
333+
334+
@Override
335+
protected void hookOnSubscribe(Subscription subscription) {
336+
// noops
337+
}
338338

339339
@Override
340340
protected void hookOnNext(Payload payload) {
@@ -355,7 +355,7 @@ protected void hookOnComplete() {
355355
protected void hookOnError(Throwable t) {
356356
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
357357
sendProcessor.onNext(frame);
358-
receiver.dispose();
358+
receiver.onError(t);
359359
}
360360

361361
@Override
@@ -377,21 +377,20 @@ public void accept(long n) {
377377
senders.put(streamId, upstreamSubscriber);
378378
receivers.put(streamId, receiver);
379379

380-
inboundFlux.subscribe(upstreamSubscriber);
381-
382-
ByteBuf frame =
383-
RequestChannelFrameFlyweight.encode(
384-
allocator,
385-
streamId,
386-
false,
387-
false,
388-
n,
389-
initialPayload.sliceMetadata().retain(),
390-
initialPayload.sliceData().retain());
391-
392-
sendProcessor.onNext(frame);
393-
380+
inboundFlux.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(upstreamSubscriber);
394381
if (!payloadReleasedFlag.getAndSet(true)) {
382+
ByteBuf frame =
383+
RequestChannelFrameFlyweight.encode(
384+
allocator,
385+
streamId,
386+
false,
387+
false,
388+
n,
389+
initialPayload.sliceMetadata().retain(),
390+
initialPayload.sliceData().retain());
391+
392+
sendProcessor.onNext(frame);
393+
395394
initialPayload.release();
396395
}
397396
} else {
@@ -405,16 +404,15 @@ public void accept(long n) {
405404
upstreamSubscriber.cancel();
406405
}
407406
})
407+
.doOnComplete(() -> receivers.remove(streamId, receiver))
408408
.doOnCancel(
409409
() -> {
410410
if (!payloadReleasedFlag.getAndSet(true)) {
411411
initialPayload.release();
412412
}
413-
if (contains(streamId)) {
413+
if (receivers.remove(streamId, receiver)) {
414414
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
415-
if (receivers.remove(streamId, receiver)) {
416-
upstreamSubscriber.cancel();
417-
}
415+
upstreamSubscriber.cancel();
418416
}
419417
});
420418
}

rsocket-core/src/main/java/io/rsocket/RSocketResponder.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.rsocket.exceptions.ApplicationErrorException;
2424
import io.rsocket.frame.*;
2525
import io.rsocket.frame.decoder.PayloadDecoder;
26-
import io.rsocket.internal.RateLimitableRequestSubscriber;
2726
import io.rsocket.internal.SynchronizedIntObjectHashMap;
2827
import io.rsocket.internal.UnboundedProcessor;
2928
import io.rsocket.lease.ResponderLeaseHandler;
@@ -358,16 +357,10 @@ protected void hookFinally(SignalType type) {
358357
}
359358

360359
private void handleRequestResponse(int streamId, Mono<Payload> response) {
361-
response.subscribe(
360+
final BaseSubscriber<Payload> subscriber =
362361
new BaseSubscriber<Payload>() {
363362
private boolean isEmpty = true;
364363

365-
@Override
366-
protected void hookOnSubscribe(Subscription subscription) {
367-
sendingSubscriptions.put(streamId, subscription);
368-
subscription.request(Long.MAX_VALUE);
369-
}
370-
371364
@Override
372365
protected void hookOnNext(Payload payload) {
373366
if (isEmpty) {
@@ -401,18 +394,20 @@ protected void hookOnComplete() {
401394

402395
@Override
403396
protected void hookFinally(SignalType type) {
404-
sendingSubscriptions.remove(streamId);
397+
sendingSubscriptions.remove(streamId, this);
405398
}
406-
});
399+
};
400+
401+
sendingSubscriptions.put(streamId, subscriber);
402+
response.subscribe(subscriber);
407403
}
408404

409405
private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
410-
response.subscribe(
411-
new RateLimitableRequestSubscriber<Payload>(Queues.SMALL_BUFFER_SIZE) {
406+
final BaseSubscriber<Payload> subscriber =
407+
new BaseSubscriber<Payload>() {
412408

413409
@Override
414410
protected void hookOnSubscribe(Subscription s) {
415-
sendingSubscriptions.put(streamId, s);
416411
s.request(initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
417412
}
418413

@@ -445,7 +440,10 @@ protected void hookOnError(Throwable throwable) {
445440
protected void hookFinally(SignalType type) {
446441
sendingSubscriptions.remove(streamId);
447442
}
448-
});
443+
};
444+
445+
sendingSubscriptions.put(streamId, subscriber);
446+
response.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(subscriber);
449447
}
450448

451449
private void handleChannel(int streamId, Payload payload, int initialRequestN) {
@@ -466,7 +464,7 @@ public void accept(long l) {
466464
long n;
467465
if (first) {
468466
first = false;
469-
n = l - 1;
467+
n = l - 1L;
470468
} else {
471469
n = l;
472470
}

rsocket-core/src/main/java/io/rsocket/frame/FrameUtil.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ public static String toString(ByteBuf frame) {
2222
.append(Integer.toBinaryString(FrameHeaderFlyweight.flags(frame)))
2323
.append(" Length: " + frame.readableBytes());
2424

25+
if (frameType.hasInitialRequestN()) {
26+
payload
27+
.append(" InitialRequestN: ")
28+
.append(RequestStreamFrameFlyweight.initialRequestN(frame));
29+
}
30+
31+
if (frameType == FrameType.REQUEST_N) {
32+
payload.append(" RequestN: ").append(RequestNFrameFlyweight.requestN(frame));
33+
}
34+
2535
if (FrameHeaderFlyweight.hasMetadata(frame)) {
2636
payload.append("\nMetadata:\n");
2737

0 commit comments

Comments
 (0)