Skip to content

Commit 65ac940

Browse files
committed
more fixes
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 11fafa7 commit 65ac940

File tree

12 files changed

+70
-2486
lines changed

12 files changed

+70
-2486
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040
import io.rsocket.frame.RequestResponseFrameFlyweight;
4141
import io.rsocket.frame.RequestStreamFrameFlyweight;
4242
import io.rsocket.frame.decoder.PayloadDecoder;
43-
import io.rsocket.internal.FluxSwitchOnFirst;
44-
import io.rsocket.internal.RateLimitableRequestSubscriber;
4543
import io.rsocket.internal.SynchronizedIntObjectHashMap;
4644
import io.rsocket.internal.UnboundedProcessor;
4745
import io.rsocket.internal.UnicastMonoEmpty;
@@ -63,6 +61,7 @@
6361
import org.reactivestreams.Publisher;
6462
import org.reactivestreams.Subscriber;
6563
import org.reactivestreams.Subscription;
64+
import reactor.core.publisher.BaseSubscriber;
6665
import reactor.core.publisher.Flux;
6766
import reactor.core.publisher.Mono;
6867
import reactor.core.publisher.SignalType;
@@ -86,7 +85,7 @@ class RSocketRequester implements RSocket {
8685
private final PayloadDecoder payloadDecoder;
8786
private final Consumer<Throwable> errorConsumer;
8887
private final StreamIdSupplier streamIdSupplier;
89-
private final IntObjectMap<RateLimitableRequestSubscriber> senders;
88+
private final IntObjectMap<Subscription> senders;
9089
private final IntObjectMap<Processor<Payload, Payload>> receivers;
9190
private final UnboundedProcessor<ByteBuf> sendProcessor;
9291
private final RequesterLeaseHandler leaseHandler;
@@ -265,7 +264,6 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
265264
receivers.put(streamId, receiver);
266265

267266
return receiver
268-
.log()
269267
.doOnRequest(
270268
new LongConsumer() {
271269

@@ -315,29 +313,31 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
315313
return Flux.error(err);
316314
}
317315

318-
return request.transform(
319-
f ->
320-
new FluxSwitchOnFirst<>(
321-
f,
322-
(s, flux) -> {
323-
Payload payload = s.get();
324-
if (payload != null) {
325-
return handleChannel(flux, payload);
326-
} else {
327-
return flux;
328-
}
329-
},
330-
false));
316+
return request.switchOnFirst(
317+
(s, flux) -> {
318+
Payload payload = s.get();
319+
if (payload != null) {
320+
return handleChannel(payload, flux.skip(1));
321+
} else {
322+
return flux;
323+
}
324+
},
325+
false);
331326
}
332327

333-
private Flux<? extends Payload> handleChannel(Flux<Payload> inboundFlux, Payload initialPayload) {
328+
private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Payload> inboundFlux) {
334329
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
335330
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);
336331
final int streamId = streamIdSupplier.nextStreamId(receivers);
337332

338333
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
339-
final RateLimitableRequestSubscriber<Payload> upstreamSubscriber =
340-
new RateLimitableRequestSubscriber<Payload>(Queues.SMALL_BUFFER_SIZE) {
334+
final BaseSubscriber<Payload> upstreamSubscriber =
335+
new BaseSubscriber<Payload>() {
336+
337+
@Override
338+
protected void hookOnSubscribe(Subscription subscription) {
339+
// noops
340+
}
341341

342342
@Override
343343
protected void hookOnNext(Payload payload) {
@@ -358,7 +358,7 @@ protected void hookOnComplete() {
358358
protected void hookOnError(Throwable t) {
359359
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
360360
sendProcessor.onNext(frame);
361-
receiver.dispose();
361+
receiver.onError(t);
362362
}
363363

364364
@Override
@@ -380,21 +380,20 @@ public void accept(long n) {
380380
senders.put(streamId, upstreamSubscriber);
381381
receivers.put(streamId, receiver);
382382

383-
inboundFlux.subscribe(upstreamSubscriber);
384-
385-
ByteBuf frame =
386-
RequestChannelFrameFlyweight.encode(
387-
allocator,
388-
streamId,
389-
false,
390-
false,
391-
n,
392-
initialPayload.sliceMetadata().retain(),
393-
initialPayload.sliceData().retain());
394-
395-
sendProcessor.onNext(frame);
396-
383+
inboundFlux.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(upstreamSubscriber);
397384
if (!payloadReleasedFlag.getAndSet(true)) {
385+
ByteBuf frame =
386+
RequestChannelFrameFlyweight.encode(
387+
allocator,
388+
streamId,
389+
false,
390+
false,
391+
n,
392+
initialPayload.sliceMetadata().retain(),
393+
initialPayload.sliceData().retain());
394+
395+
sendProcessor.onNext(frame);
396+
398397
initialPayload.release();
399398
}
400399
} else {
@@ -408,16 +407,15 @@ public void accept(long n) {
408407
upstreamSubscriber.cancel();
409408
}
410409
})
410+
.doOnComplete(() -> receivers.remove(streamId, receiver))
411411
.doOnCancel(
412412
() -> {
413413
if (!payloadReleasedFlag.getAndSet(true)) {
414414
initialPayload.release();
415415
}
416-
if (contains(streamId)) {
416+
if (receivers.remove(streamId, receiver)) {
417417
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
418-
if (receivers.remove(streamId, receiver)) {
419-
upstreamSubscriber.cancel();
420-
}
418+
upstreamSubscriber.cancel();
421419
}
422420
});
423421
}

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.rsocket.exceptions.ApplicationErrorException;
2828
import io.rsocket.frame.*;
2929
import io.rsocket.frame.decoder.PayloadDecoder;
30-
import io.rsocket.internal.RateLimitableRequestSubscriber;
3130
import io.rsocket.internal.SynchronizedIntObjectHashMap;
3231
import io.rsocket.internal.UnboundedProcessor;
3332
import io.rsocket.lease.ResponderLeaseHandler;
@@ -362,16 +361,10 @@ protected void hookFinally(SignalType type) {
362361
}
363362

364363
private void handleRequestResponse(int streamId, Mono<Payload> response) {
365-
response.subscribe(
364+
final BaseSubscriber<Payload> subscriber =
366365
new BaseSubscriber<Payload>() {
367366
private boolean isEmpty = true;
368367

369-
@Override
370-
protected void hookOnSubscribe(Subscription subscription) {
371-
sendingSubscriptions.put(streamId, subscription);
372-
subscription.request(Long.MAX_VALUE);
373-
}
374-
375368
@Override
376369
protected void hookOnNext(Payload payload) {
377370
if (isEmpty) {
@@ -405,18 +398,20 @@ protected void hookOnComplete() {
405398

406399
@Override
407400
protected void hookFinally(SignalType type) {
408-
sendingSubscriptions.remove(streamId);
401+
sendingSubscriptions.remove(streamId, this);
409402
}
410-
});
403+
};
404+
405+
sendingSubscriptions.put(streamId, subscriber);
406+
response.subscribe(subscriber);
411407
}
412408

413409
private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
414-
response.subscribe(
415-
new RateLimitableRequestSubscriber<Payload>(Queues.SMALL_BUFFER_SIZE) {
410+
final BaseSubscriber<Payload> subscriber =
411+
new BaseSubscriber<Payload>() {
416412

417413
@Override
418414
protected void hookOnSubscribe(Subscription s) {
419-
sendingSubscriptions.put(streamId, s);
420415
s.request(initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
421416
}
422417

@@ -449,7 +444,10 @@ protected void hookOnError(Throwable throwable) {
449444
protected void hookFinally(SignalType type) {
450445
sendingSubscriptions.remove(streamId);
451446
}
452-
});
447+
};
448+
449+
sendingSubscriptions.put(streamId, subscriber);
450+
response.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(subscriber);
453451
}
454452

455453
private void handleChannel(int streamId, Payload payload, int initialRequestN) {
@@ -470,7 +468,7 @@ public void accept(long l) {
470468
long n;
471469
if (first) {
472470
first = false;
473-
n = l - 1;
471+
n = l - 1L;
474472
} else {
475473
n = l;
476474
}

rsocket-core/src/main/java/io/rsocket/frame/FrameUtil.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ public static String toString(ByteBuf frame) {
2222
.append(Integer.toBinaryString(FrameHeaderFlyweight.flags(frame)))
2323
.append(" Length: " + frame.readableBytes());
2424

25+
if (frameType.hasInitialRequestN()) {
26+
payload
27+
.append(" InitialRequestN: ")
28+
.append(RequestStreamFrameFlyweight.initialRequestN(frame));
29+
}
30+
31+
if (frameType == FrameType.REQUEST_N) {
32+
payload.append(" RequestN: ").append(RequestNFrameFlyweight.requestN(frame));
33+
}
34+
2535
if (FrameHeaderFlyweight.hasMetadata(frame)) {
2636
payload.append("\nMetadata:\n");
2737

0 commit comments

Comments
 (0)