Skip to content

Commit 83d7e28

Browse files
committed
Additional cases of map iteration and removal
A follow-up fix that extends the changes in 1ca3b99 See gh-914 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 1ca3b99 commit 83d7e28

File tree

2 files changed

+45
-49
lines changed

2 files changed

+45
-49
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -771,30 +771,28 @@ private void terminate(Throwable e) {
771771
connection.dispose();
772772
leaseHandler.dispose();
773773

774-
receivers
775-
.values()
776-
.forEach(
777-
receiver -> {
778-
try {
779-
receiver.onError(e);
780-
} catch (Throwable t) {
781-
if (LOGGER.isDebugEnabled()) {
782-
LOGGER.debug("Dropped exception", t);
783-
}
784-
}
785-
});
786-
senders
787-
.values()
788-
.forEach(
789-
sender -> {
790-
try {
791-
sender.cancel();
792-
} catch (Throwable t) {
793-
if (LOGGER.isDebugEnabled()) {
794-
LOGGER.debug("Dropped exception", t);
795-
}
796-
}
797-
});
774+
// Iterate explicitly to handle collisions with concurrent removals
775+
for (IntObjectMap.PrimitiveEntry<Processor<Payload, Payload>> entry : receivers.entries()) {
776+
try {
777+
entry.value().onError(e);
778+
} catch (Throwable ex) {
779+
if (LOGGER.isDebugEnabled()) {
780+
LOGGER.debug("Dropped exception", ex);
781+
}
782+
}
783+
}
784+
785+
// Iterate explicitly to handle collisions with concurrent removals
786+
for (IntObjectMap.PrimitiveEntry<Subscription> entry : senders.entries()) {
787+
try {
788+
entry.value().cancel();
789+
} catch (Throwable ex) {
790+
if (LOGGER.isDebugEnabled()) {
791+
LOGGER.debug("Dropped exception", ex);
792+
}
793+
}
794+
}
795+
798796
senders.clear();
799797
receivers.clear();
800798
sendProcessor.dispose();

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,14 @@
2727
import io.rsocket.DuplexConnection;
2828
import io.rsocket.Payload;
2929
import io.rsocket.RSocket;
30-
import io.rsocket.frame.*;
30+
import io.rsocket.frame.CancelFrameCodec;
31+
import io.rsocket.frame.ErrorFrameCodec;
32+
import io.rsocket.frame.FrameHeaderCodec;
33+
import io.rsocket.frame.FrameType;
34+
import io.rsocket.frame.PayloadFrameCodec;
35+
import io.rsocket.frame.RequestChannelFrameCodec;
36+
import io.rsocket.frame.RequestNFrameCodec;
37+
import io.rsocket.frame.RequestStreamFrameCodec;
3138
import io.rsocket.frame.decoder.PayloadDecoder;
3239
import io.rsocket.internal.SynchronizedIntObjectHashMap;
3340
import io.rsocket.internal.UnboundedProcessor;
@@ -46,7 +53,11 @@
4653
import org.slf4j.LoggerFactory;
4754
import reactor.core.Disposable;
4855
import reactor.core.Exceptions;
49-
import reactor.core.publisher.*;
56+
import reactor.core.publisher.BaseSubscriber;
57+
import reactor.core.publisher.Flux;
58+
import reactor.core.publisher.Mono;
59+
import reactor.core.publisher.SignalType;
60+
import reactor.core.publisher.UnicastProcessor;
5061
import reactor.util.annotation.Nullable;
5162
import reactor.util.concurrent.Queues;
5263

@@ -129,19 +140,7 @@ class RSocketResponder implements RSocket {
129140

130141
private void handleSendProcessorError(Throwable t) {
131142
cleanUpSendingSubscriptions();
132-
133-
channelProcessors
134-
.values()
135-
.forEach(
136-
subscription -> {
137-
try {
138-
subscription.onError(t);
139-
} catch (Throwable e) {
140-
if (LOGGER.isDebugEnabled()) {
141-
LOGGER.debug("Dropped exception", t);
142-
}
143-
}
144-
});
143+
cleanUpChannelProcessors(t);
145144
}
146145

147146
private void tryTerminateOnConnectionError(Throwable e) {
@@ -278,16 +277,15 @@ private synchronized void cleanUpSendingSubscriptions() {
278277
}
279278

280279
private synchronized void cleanUpChannelProcessors(Throwable e) {
281-
channelProcessors
282-
.values()
283-
.forEach(
284-
payloadPayloadProcessor -> {
285-
try {
286-
payloadPayloadProcessor.onError(e);
287-
} catch (Throwable t) {
288-
// noops
289-
}
290-
});
280+
// Iterate explicitly to handle collisions with concurrent removals
281+
for (IntObjectMap.PrimitiveEntry<Processor<Payload, Payload>> entry :
282+
channelProcessors.entries()) {
283+
try {
284+
entry.value().onError(e);
285+
} catch (Throwable ex) {
286+
// noops
287+
}
288+
}
291289
channelProcessors.clear();
292290
}
293291

0 commit comments

Comments
 (0)