File tree Expand file tree Collapse file tree 1 file changed +11
-13
lines changed
rsocket-core/src/main/java/io/rsocket/core Expand file tree Collapse file tree 1 file changed +11
-13
lines changed Original file line number Diff line number Diff line change @@ -128,18 +128,7 @@ class RSocketResponder implements RSocket {
128
128
}
129
129
130
130
private void handleSendProcessorError (Throwable t ) {
131
- sendingSubscriptions
132
- .values ()
133
- .forEach (
134
- subscription -> {
135
- try {
136
- subscription .cancel ();
137
- } catch (Throwable e ) {
138
- if (LOGGER .isDebugEnabled ()) {
139
- LOGGER .debug ("Dropped exception" , t );
140
- }
141
- }
142
- });
131
+ cleanUpSendingSubscriptions ();
143
132
144
133
channelProcessors
145
134
.values ()
@@ -275,7 +264,16 @@ private void cleanup(Throwable e) {
275
264
}
276
265
277
266
private synchronized void cleanUpSendingSubscriptions () {
278
- sendingSubscriptions .values ().forEach (Subscription ::cancel );
267
+ // Iterate explicitly to handle collisions with concurrent removals
268
+ for (IntObjectMap .PrimitiveEntry <Subscription > entry : sendingSubscriptions .entries ()) {
269
+ try {
270
+ entry .value ().cancel ();
271
+ } catch (Throwable ex ) {
272
+ if (LOGGER .isDebugEnabled ()) {
273
+ LOGGER .debug ("Dropped exception" , ex );
274
+ }
275
+ }
276
+ }
279
277
sendingSubscriptions .clear ();
280
278
}
281
279
You can’t perform that action at this time.
0 commit comments