29
29
30
30
import javax .annotation .Nullable ;
31
31
import java .time .Duration ;
32
- import java .util .Collection ;
33
32
import java .util .concurrent .atomic .AtomicBoolean ;
34
33
import java .util .concurrent .atomic .AtomicInteger ;
35
34
import java .util .function .Consumer ;
@@ -122,22 +121,15 @@ class RSocketClient implements RSocket {
122
121
}
123
122
124
123
private void handleSendProcessorError (Throwable t ) {
125
- Collection <UnicastProcessor <Payload >> values ;
126
- Collection <LimitableRequestPublisher > values1 ;
127
- synchronized (RSocketClient .this ) {
128
- values = receivers .values ();
129
- values1 = senders .values ();
130
- }
131
-
132
- for (Subscriber subscriber : values ) {
124
+ for (Subscriber subscriber : receivers .values ()) {
133
125
try {
134
126
subscriber .onError (t );
135
127
} catch (Throwable e ) {
136
128
errorConsumer .accept (e );
137
129
}
138
130
}
139
131
140
- for (LimitableRequestPublisher p : values1 ) {
132
+ for (LimitableRequestPublisher p : senders . values () ) {
141
133
p .cancel ();
142
134
}
143
135
}
@@ -146,22 +138,16 @@ private void handleSendProcessorCancel(SignalType t) {
146
138
if (SignalType .ON_ERROR == t ) {
147
139
return ;
148
140
}
149
- Collection <UnicastProcessor <Payload >> values ;
150
- Collection <LimitableRequestPublisher > values1 ;
151
- synchronized (RSocketClient .this ) {
152
- values = receivers .values ();
153
- values1 = senders .values ();
154
- }
155
141
156
- for (Subscriber subscriber : values ) {
142
+ for (Subscriber subscriber : receivers . values () ) {
157
143
try {
158
144
subscriber .onError (new Throwable ("closed connection" ));
159
145
} catch (Throwable e ) {
160
146
errorConsumer .accept (e );
161
147
}
162
148
}
163
149
164
- for (LimitableRequestPublisher p : values1 ) {
150
+ for (LimitableRequestPublisher p : senders . values () ) {
165
151
p .cancel ();
166
152
}
167
153
}
@@ -255,10 +241,7 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
255
241
int streamId = streamIdSupplier .nextStreamId ();
256
242
257
243
UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
258
-
259
- synchronized (this ) {
260
- receivers .put (streamId , receiver );
261
- }
244
+ receivers .put (streamId , receiver );
262
245
263
246
AtomicBoolean first = new AtomicBoolean (false );
264
247
@@ -289,7 +272,7 @@ public Flux<Payload> handleRequestStream(final Payload payload) {
289
272
})
290
273
.doFinally (
291
274
s -> {
292
- removeReceiver (streamId );
275
+ receivers . remove (streamId );
293
276
});
294
277
}));
295
278
}
@@ -304,10 +287,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
304
287
payload .release ();
305
288
306
289
UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
307
-
308
- synchronized (this ) {
309
- receivers .put (streamId , receiver );
310
- }
290
+ receivers .put (streamId , receiver );
311
291
312
292
sendProcessor .onNext (requestFrame );
313
293
@@ -317,7 +297,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
317
297
.doOnCancel (() -> sendProcessor .onNext (Frame .Cancel .from (streamId )))
318
298
.doFinally (
319
299
s -> {
320
- removeReceiver (streamId );
300
+ receivers . remove (streamId );
321
301
});
322
302
}));
323
303
}
@@ -364,10 +344,8 @@ public Flux<Payload> get() {
364
344
LimitableRequestPublisher .wrap (f );
365
345
// Need to set this to one for first the frame
366
346
wrapped .increaseRequestLimit (1 );
367
- synchronized (RSocketClient .this ) {
368
- senders .put (streamId , wrapped );
369
- receivers .put (streamId , receiver );
370
- }
347
+ senders .put (streamId , wrapped );
348
+ receivers .put (streamId , receiver );
371
349
372
350
return wrapped ;
373
351
})
@@ -424,39 +402,32 @@ public Frame apply(Payload payload) {
424
402
})
425
403
.doFinally (
426
404
s -> {
427
- removeReceiver (streamId );
428
- removeSender (streamId );
405
+ receivers . remove (streamId );
406
+ senders . remove (streamId );
429
407
});
430
408
}
431
409
}));
432
410
}
433
411
434
412
private boolean contains (int streamId ) {
435
- synchronized (RSocketClient .this ) {
436
- return receivers .containsKey (streamId );
437
- }
413
+ return receivers .containsKey (streamId );
438
414
}
439
415
440
416
protected void cleanup () {
441
417
try {
442
- Collection < UnicastProcessor <Payload >> subscribers ;
443
- Collection < LimitableRequestPublisher > publishers ;
444
- synchronized ( RSocketClient . this ) {
445
- subscribers = receivers .values ();
446
- publishers = senders . values ( );
418
+ for ( UnicastProcessor <Payload > subscriber : receivers . values ()) {
419
+ cleanUpSubscriber ( subscriber ) ;
420
+ }
421
+ for ( LimitableRequestPublisher p : senders .values ()) {
422
+ cleanUpLimitableRequestPublisher ( p );
447
423
}
448
-
449
- subscribers .forEach (this ::cleanUpSubscriber );
450
- publishers .forEach (this ::cleanUpLimitableRequestPublisher );
451
424
452
425
if (null != keepAliveSendSub ) {
453
426
keepAliveSendSub .dispose ();
454
427
}
455
428
} finally {
456
- synchronized (this ) {
457
- senders .clear ();
458
- receivers .clear ();
459
- }
429
+ senders .clear ();
430
+ receivers .clear ();
460
431
}
461
432
}
462
433
@@ -513,29 +484,23 @@ private void handleStreamZero(FrameType type, Frame frame) {
513
484
}
514
485
515
486
private void handleFrame (int streamId , FrameType type , Frame frame ) {
516
- Subscriber <Payload > receiver ;
517
- synchronized (this ) {
518
- receiver = receivers .get (streamId );
519
- }
487
+ Subscriber <Payload > receiver = receivers .get (streamId );
520
488
if (receiver == null ) {
521
489
handleMissingResponseProcessor (streamId , type , frame );
522
490
} else {
523
491
switch (type ) {
524
492
case ERROR :
525
493
receiver .onError (Exceptions .from (frame ));
526
- removeReceiver (streamId );
494
+ receivers . remove (streamId );
527
495
break ;
528
496
case NEXT_COMPLETE :
529
497
receiver .onNext (frameDecoder .apply (frame ));
530
498
receiver .onComplete ();
531
499
break ;
532
500
case CANCEL :
533
501
{
534
- LimitableRequestPublisher sender ;
535
- synchronized (this ) {
536
- sender = senders .remove (streamId );
537
- removeReceiver (streamId );
538
- }
502
+ LimitableRequestPublisher sender = senders .remove (streamId );
503
+ receivers .remove (streamId );
539
504
if (sender != null ) {
540
505
sender .cancel ();
541
506
}
@@ -546,10 +511,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
546
511
break ;
547
512
case REQUEST_N :
548
513
{
549
- LimitableRequestPublisher sender ;
550
- synchronized (this ) {
551
- sender = senders .get (streamId );
552
- }
514
+ LimitableRequestPublisher sender = senders .get (streamId );
553
515
if (sender != null ) {
554
516
int n = Frame .RequestN .requestN (frame );
555
517
sender .increaseRequestLimit (n );
@@ -559,9 +521,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
559
521
}
560
522
case COMPLETE :
561
523
receiver .onComplete ();
562
- synchronized (this ) {
563
- receivers .remove (streamId );
564
- }
524
+ receivers .remove (streamId );
565
525
break ;
566
526
default :
567
527
throw new IllegalStateException (
@@ -593,12 +553,4 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame
593
553
// receiving a frame after a given stream has been cancelled/completed,
594
554
// so ignore (cancellation is async so there is a race condition)
595
555
}
596
-
597
- private synchronized void removeReceiver (int streamId ) {
598
- receivers .remove (streamId );
599
- }
600
-
601
- private synchronized void removeSender (int streamId ) {
602
- senders .remove (streamId );
603
- }
604
556
}
0 commit comments