Skip to content

Commit 31f1730

Browse files
mostroverkhovrobertroeser
authored andcommitted
RSocketRequester: fix concurrent modification of senders & receivers … (#706)
* RSocketRequester: fix concurrent modification of senders & receivers map on termination fix non-deterministic errors order on termination Signed-off-by: Maksym Ostroverkhov <[email protected]> * improve test Signed-off-by: Maksym Ostroverkhov <[email protected]> * address review Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 5a0c78d commit 31f1730

File tree

2 files changed

+108
-110
lines changed

2 files changed

+108
-110
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 98 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4141
import java.util.function.Consumer;
4242
import java.util.function.LongConsumer;
43+
import java.util.function.Supplier;
4344
import javax.annotation.Nonnull;
4445
import javax.annotation.Nullable;
4546
import org.reactivestreams.Processor;
@@ -56,6 +57,11 @@ class RSocketRequester implements RSocket {
5657
private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR =
5758
AtomicReferenceFieldUpdater.newUpdater(
5859
RSocketRequester.class, Throwable.class, "terminationError");
60+
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
61+
62+
static {
63+
CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
64+
}
5965

6066
private final DuplexConnection connection;
6167
private final PayloadDecoder payloadDecoder;
@@ -91,69 +97,25 @@ class RSocketRequester implements RSocket {
9197
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
9298
this.sendProcessor = new UnboundedProcessor<>();
9399

94-
connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer);
95100
connection
96-
.send(sendProcessor)
97-
.doFinally(this::handleSendProcessorCancel)
98-
.subscribe(null, this::handleSendProcessorError);
101+
.onClose()
102+
.doFinally(signalType -> tryTerminateOnConnectionClose())
103+
.subscribe(null, errorConsumer);
104+
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);
99105

100106
connection.receive().subscribe(this::handleIncomingFrames, errorConsumer);
101107

102108
if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
103109
KeepAliveSupport keepAliveSupport =
104110
new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout);
105111
this.keepAliveFramesAcceptor =
106-
keepAliveHandler.start(keepAliveSupport, sendProcessor::onNext, this::terminate);
112+
keepAliveHandler.start(
113+
keepAliveSupport, sendProcessor::onNext, this::tryTerminateOnKeepAlive);
107114
} else {
108115
keepAliveFramesAcceptor = null;
109116
}
110117
}
111118

112-
private void terminate(KeepAlive keepAlive) {
113-
String message =
114-
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis());
115-
ConnectionErrorException err = new ConnectionErrorException(message);
116-
setTerminationError(err);
117-
errorConsumer.accept(err);
118-
connection.dispose();
119-
}
120-
121-
private void handleSendProcessorError(Throwable t) {
122-
Throwable terminationError = this.terminationError;
123-
Throwable err = terminationError != null ? terminationError : t;
124-
receivers
125-
.values()
126-
.forEach(
127-
subscriber -> {
128-
try {
129-
subscriber.onError(err);
130-
} catch (Throwable e) {
131-
errorConsumer.accept(e);
132-
}
133-
});
134-
135-
senders.values().forEach(RateLimitableRequestPublisher::cancel);
136-
}
137-
138-
private void handleSendProcessorCancel(SignalType t) {
139-
if (SignalType.ON_ERROR == t) {
140-
return;
141-
}
142-
143-
receivers
144-
.values()
145-
.forEach(
146-
subscriber -> {
147-
try {
148-
subscriber.onError(new Throwable("closed connection"));
149-
} catch (Throwable e) {
150-
errorConsumer.accept(e);
151-
}
152-
});
153-
154-
senders.values().forEach(RateLimitableRequestPublisher::cancel);
155-
}
156-
157119
@Override
158120
public Mono<Void> fireAndForget(Payload payload) {
159121
return handleFireAndForget(payload);
@@ -263,8 +225,7 @@ public void acceptOnce(@Nonnull Subscription subscription) {
263225
if (s == SignalType.CANCEL) {
264226
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
265227
}
266-
267-
receivers.remove(streamId);
228+
removeStreamReceiver(streamId);
268229
});
269230
}
270231

@@ -318,7 +279,7 @@ public void accept(long n) {
318279
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
319280
}
320281
})
321-
.doFinally(s -> receivers.remove(streamId));
282+
.doFinally(s -> removeStreamReceiver(streamId));
322283
}
323284

324285
private Flux<Payload> handleChannel(Flux<Payload> request) {
@@ -419,14 +380,7 @@ protected void hookOnError(Throwable t) {
419380
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
420381
}
421382
})
422-
.doFinally(
423-
s -> {
424-
receivers.remove(streamId);
425-
RateLimitableRequestPublisher sender = senders.remove(streamId);
426-
if (sender != null) {
427-
sender.cancel();
428-
}
429-
});
383+
.doFinally(s -> removeStreamReceiverAndSender(streamId));
430384
}
431385

432386
private Mono<Void> handleMetadataPush(Payload payload) {
@@ -472,40 +426,6 @@ private boolean contains(int streamId) {
472426
return receivers.containsKey(streamId);
473427
}
474428

475-
private void terminate() {
476-
setTerminationError(new ClosedChannelException());
477-
leaseHandler.dispose();
478-
try {
479-
receivers.values().forEach(this::cleanUpSubscriber);
480-
senders.values().forEach(this::cleanUpLimitableRequestPublisher);
481-
} finally {
482-
senders.clear();
483-
receivers.clear();
484-
sendProcessor.dispose();
485-
}
486-
}
487-
488-
private void setTerminationError(Throwable error) {
489-
TERMINATION_ERROR.compareAndSet(this, null, error);
490-
}
491-
492-
private synchronized void cleanUpLimitableRequestPublisher(
493-
RateLimitableRequestPublisher<?> limitableRequestPublisher) {
494-
try {
495-
limitableRequestPublisher.cancel();
496-
} catch (Throwable t) {
497-
errorConsumer.accept(t);
498-
}
499-
}
500-
501-
private synchronized void cleanUpSubscriber(Processor subscriber) {
502-
try {
503-
subscriber.onError(terminationError);
504-
} catch (Throwable t) {
505-
errorConsumer.accept(t);
506-
}
507-
}
508-
509429
private void handleIncomingFrames(ByteBuf frame) {
510430
try {
511431
int streamId = FrameHeaderFlyweight.streamId(frame);
@@ -525,10 +445,7 @@ private void handleIncomingFrames(ByteBuf frame) {
525445
private void handleStreamZero(FrameType type, ByteBuf frame) {
526446
switch (type) {
527447
case ERROR:
528-
RuntimeException error = Exceptions.from(frame);
529-
setTerminationError(error);
530-
errorConsumer.accept(error);
531-
connection.dispose();
448+
tryTerminateOnZeroError(frame);
532449
break;
533450
case LEASE:
534451
leaseHandler.receive(frame);
@@ -614,4 +531,86 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu
614531
// receiving a frame after a given stream has been cancelled/completed,
615532
// so ignore (cancellation is async so there is a race condition)
616533
}
534+
535+
private void tryTerminateOnKeepAlive(KeepAlive keepAlive) {
536+
tryTerminate(
537+
() ->
538+
new ConnectionErrorException(
539+
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())));
540+
}
541+
542+
private void tryTerminateOnConnectionClose() {
543+
tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
544+
}
545+
546+
private void tryTerminateOnZeroError(ByteBuf errorFrame) {
547+
tryTerminate(() -> Exceptions.from(errorFrame));
548+
}
549+
550+
private void tryTerminate(Supplier<Exception> errorSupplier) {
551+
if (terminationError == null) {
552+
Exception e = errorSupplier.get();
553+
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
554+
terminate(e);
555+
}
556+
}
557+
}
558+
559+
private void terminate(Exception e) {
560+
connection.dispose();
561+
leaseHandler.dispose();
562+
563+
synchronized (receivers) {
564+
receivers
565+
.values()
566+
.forEach(
567+
receiver -> {
568+
try {
569+
receiver.onError(e);
570+
} catch (Throwable t) {
571+
errorConsumer.accept(t);
572+
}
573+
});
574+
}
575+
synchronized (senders) {
576+
senders
577+
.values()
578+
.forEach(
579+
sender -> {
580+
try {
581+
sender.cancel();
582+
} catch (Throwable t) {
583+
errorConsumer.accept(t);
584+
}
585+
});
586+
}
587+
senders.clear();
588+
receivers.clear();
589+
sendProcessor.dispose();
590+
errorConsumer.accept(e);
591+
}
592+
593+
private void removeStreamReceiver(int streamId) {
594+
/*on termination receivers are explicitly cleared to avoid removing from map while iterating over one
595+
of its views*/
596+
if (terminationError == null) {
597+
receivers.remove(streamId);
598+
}
599+
}
600+
601+
private void removeStreamReceiverAndSender(int streamId) {
602+
/*on termination senders & receivers are explicitly cleared to avoid removing from map while iterating over one
603+
of its views*/
604+
if (terminationError == null) {
605+
receivers.remove(streamId);
606+
RateLimitableRequestPublisher<?> sender = senders.remove(streamId);
607+
if (sender != null) {
608+
sender.cancel();
609+
}
610+
}
611+
}
612+
613+
private void handleSendProcessorError(Throwable t) {
614+
connection.dispose();
615+
}
617616
}

rsocket-core/src/test/java/io/rsocket/SetupRejectionTest.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
import java.time.Duration;
1919
import java.util.ArrayList;
2020
import java.util.List;
21-
import org.junit.Ignore;
2221
import org.junit.jupiter.api.Test;
2322
import reactor.core.publisher.Mono;
2423
import reactor.core.publisher.UnicastProcessor;
2524
import reactor.test.StepVerifier;
2625

27-
@Ignore
2826
public class SetupRejectionTest {
2927

3028
@Test
@@ -64,15 +62,16 @@ void requesterStreamsTerminatedOnZeroErrorFrame() {
6462

6563
String errorMsg = "error";
6664

67-
Mono.delay(Duration.ofMillis(100))
68-
.doOnTerminate(
69-
() ->
70-
conn.addToReceivedBuffer(
71-
ErrorFrameFlyweight.encode(
72-
ByteBufAllocator.DEFAULT, 0, new RejectedSetupException(errorMsg))))
73-
.subscribe();
74-
75-
StepVerifier.create(rSocket.requestResponse(DefaultPayload.create("test")))
65+
StepVerifier.create(
66+
rSocket
67+
.requestResponse(DefaultPayload.create("test"))
68+
.doOnRequest(
69+
ignored ->
70+
conn.addToReceivedBuffer(
71+
ErrorFrameFlyweight.encode(
72+
ByteBufAllocator.DEFAULT,
73+
0,
74+
new RejectedSetupException(errorMsg)))))
7675
.expectErrorMatches(
7776
err -> err instanceof RejectedSetupException && errorMsg.equals(err.getMessage()))
7877
.verify(Duration.ofSeconds(5));

0 commit comments

Comments
 (0)