Skip to content

Commit 6c4c075

Browse files
committed
adds more tests and fixes requestChannel interaction to follow the pattern
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent ceb35e4 commit 6c4c075

File tree

2 files changed

+263
-99
lines changed

2 files changed

+263
-99
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 109 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -393,118 +393,129 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
393393

394394
private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Payload> inboundFlux) {
395395
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
396-
final int streamId = streamIdSupplier.nextStreamId(receivers);
397396

398397
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
399-
final BaseSubscriber<Payload> upstreamSubscriber =
400-
new BaseSubscriber<Payload>() {
401398

402-
boolean first = true;
399+
return receiver.transform(
400+
Operators.<Payload, Payload>lift(
401+
(s, actual) ->
402+
new RequestOperator(actual) {
403+
int streamId;
403404

404-
@Override
405-
protected void hookOnSubscribe(Subscription subscription) {
406-
// noops
407-
}
408-
409-
@Override
410-
protected void hookOnNext(Payload payload) {
411-
if (first) {
412-
// need to skip first since we have already sent it
413-
// no need to release it since it was released earlier on the request establishment
414-
// phase
415-
first = false;
416-
request(1);
417-
return;
418-
}
419-
if (!PayloadValidationUtils.isValid(mtu, payload)) {
420-
payload.release();
421-
cancel();
422-
final IllegalArgumentException t =
423-
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
424-
errorConsumer.accept(t);
425-
// no need to send any errors.
426-
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
427-
receiver.onError(t);
428-
return;
429-
}
430-
final ByteBuf frame =
431-
PayloadFrameFlyweight.encodeNextReleasingPayload(allocator, streamId, payload);
432-
433-
sendProcessor.onNext(frame);
434-
}
405+
final BaseSubscriber<Payload> upstreamSubscriber =
406+
new BaseSubscriber<Payload>() {
435407

436-
@Override
437-
protected void hookOnComplete() {
438-
ByteBuf frame = PayloadFrameFlyweight.encodeComplete(allocator, streamId);
439-
sendProcessor.onNext(frame);
440-
}
441-
442-
@Override
443-
protected void hookOnError(Throwable t) {
444-
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
445-
sendProcessor.onNext(frame);
446-
receiver.onError(t);
447-
}
408+
boolean first = true;
448409

449-
@Override
450-
protected void hookFinally(SignalType type) {
451-
senders.remove(streamId, this);
452-
}
453-
};
454-
455-
return receiver
456-
.transform(
457-
Operators.<Payload, Payload>lift(
458-
(s, actual) ->
459-
new RequestOperator(actual) {
460-
@Override
461-
void hookOnFirstRequest(long n) {
462-
ByteBuf frame;
463-
try {
464-
frame =
465-
RequestChannelFrameFlyweight.encodeReleasingPayload(
466-
allocator, streamId, false, n, initialPayload);
467-
} catch (IllegalReferenceCountException | NullPointerException e) {
468-
return;
410+
@Override
411+
protected void hookOnSubscribe(Subscription subscription) {
412+
// noops
469413
}
470414

471-
senders.put(streamId, upstreamSubscriber);
472-
receivers.put(streamId, receiver);
473-
474-
inboundFlux
475-
.limitRate(Queues.SMALL_BUFFER_SIZE)
476-
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
477-
.subscribe(upstreamSubscriber);
478-
479-
sendProcessor.onNext(frame);
480-
}
481-
482-
@Override
483-
void hookOnRestRequests(long n) {
484-
if (receiver.isDisposed()) {
485-
return;
415+
@Override
416+
protected void hookOnNext(Payload payload) {
417+
if (first) {
418+
// need to skip first since we have already sent it
419+
// no need to release it since it was released earlier on the request
420+
// establishment
421+
// phase
422+
first = false;
423+
request(1);
424+
return;
425+
}
426+
if (!PayloadValidationUtils.isValid(mtu, payload)) {
427+
payload.release();
428+
cancel();
429+
final IllegalArgumentException t =
430+
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
431+
errorConsumer.accept(t);
432+
// no need to send any errors.
433+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
434+
receiver.onError(t);
435+
return;
436+
}
437+
final ByteBuf frame =
438+
PayloadFrameFlyweight.encodeNextReleasingPayload(
439+
allocator, streamId, payload);
440+
441+
sendProcessor.onNext(frame);
486442
}
487443

488-
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
489-
}
444+
@Override
445+
protected void hookOnComplete() {
446+
ByteBuf frame = PayloadFrameFlyweight.encodeComplete(allocator, streamId);
447+
sendProcessor.onNext(frame);
448+
}
490449

491-
@Override
492-
void hookOnCancel() {
493-
senders.remove(streamId, upstreamSubscriber);
494-
if (receivers.remove(streamId, receiver)) {
495-
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
450+
@Override
451+
protected void hookOnError(Throwable t) {
452+
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
453+
sendProcessor.onNext(frame);
454+
receiver.onError(t);
496455
}
497-
}
498456

499-
@Override
500-
void hookOnTerminal(SignalType signalType) {
501-
if (signalType == SignalType.ON_ERROR) {
502-
upstreamSubscriber.cancel();
457+
@Override
458+
protected void hookFinally(SignalType type) {
459+
senders.remove(streamId, this);
503460
}
504-
receivers.remove(streamId, receiver);
505-
}
506-
}))
507-
.doOnCancel(upstreamSubscriber::cancel);
461+
};
462+
463+
@Override
464+
void hookOnFirstRequest(long n) {
465+
final int streamId = streamIdSupplier.nextStreamId(receivers);
466+
this.streamId = streamId;
467+
468+
final ByteBuf frame;
469+
try {
470+
frame =
471+
RequestChannelFrameFlyweight.encodeReleasingPayload(
472+
allocator, streamId, false, n, initialPayload);
473+
} catch (IllegalReferenceCountException | NullPointerException e) {
474+
return;
475+
}
476+
477+
senders.put(streamId, upstreamSubscriber);
478+
receivers.put(streamId, receiver);
479+
480+
inboundFlux
481+
.limitRate(Queues.SMALL_BUFFER_SIZE)
482+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
483+
.subscribe(upstreamSubscriber);
484+
485+
sendProcessor.onNext(frame);
486+
}
487+
488+
@Override
489+
void hookOnRestRequests(long n) {
490+
if (receiver.isDisposed()) {
491+
return;
492+
}
493+
494+
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
495+
}
496+
497+
@Override
498+
void hookOnCancel() {
499+
senders.remove(streamId, upstreamSubscriber);
500+
if (receivers.remove(streamId, receiver)) {
501+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
502+
}
503+
}
504+
505+
@Override
506+
void hookOnTerminal(SignalType signalType) {
507+
if (signalType == SignalType.ON_ERROR) {
508+
upstreamSubscriber.cancel();
509+
}
510+
receivers.remove(streamId, receiver);
511+
}
512+
513+
@Override
514+
public void cancel() {
515+
upstreamSubscriber.cancel();
516+
super.cancel();
517+
}
518+
}));
508519
}
509520

510521
private Mono<Void> handleMetadataPush(Payload payload) {

0 commit comments

Comments
 (0)