Skip to content

Commit 3fdc78f

Browse files
authored
adds Null-safe iteration of active streams (#1004)
Closes gh-914 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 831d5bf commit 3fdc78f

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport;
2020

2121
import io.netty.buffer.ByteBuf;
22+
import io.netty.util.collection.IntObjectMap;
2223
import io.rsocket.DuplexConnection;
2324
import io.rsocket.Payload;
2425
import io.rsocket.RSocket;
@@ -343,15 +344,15 @@ private void terminate(Throwable e) {
343344
}
344345

345346
synchronized (this) {
346-
activeStreams
347-
.values()
348-
.forEach(
349-
receiver -> {
350-
try {
351-
receiver.handleError(e);
352-
} catch (Throwable ignored) {
353-
}
354-
});
347+
for (IntObjectMap.PrimitiveEntry<FrameHandler> entry : activeStreams.entries()) {
348+
FrameHandler handler = entry.value();
349+
if (handler != null) {
350+
try {
351+
handler.handleError(e);
352+
} catch (Throwable ignored) {
353+
}
354+
}
355+
}
355356
}
356357

357358
if (e == CLOSED_CHANNEL_EXCEPTION) {

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package io.rsocket.core;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.netty.util.collection.IntObjectMap;
2021
import io.rsocket.DuplexConnection;
2122
import io.rsocket.Payload;
2223
import io.rsocket.RSocket;
@@ -183,7 +184,12 @@ final void doOnDispose() {
183184
}
184185

185186
private synchronized void cleanUpSendingSubscriptions() {
186-
activeStreams.values().forEach(FrameHandler::handleCancel);
187+
for (IntObjectMap.PrimitiveEntry<FrameHandler> entry : activeStreams.entries()) {
188+
FrameHandler handler = entry.value();
189+
if (handler != null) {
190+
handler.handleCancel();
191+
}
192+
}
187193
activeStreams.clear();
188194
}
189195

0 commit comments

Comments
 (0)