28
28
import reactor .core .publisher .Operators ;
29
29
import reactor .util .context .Context ;
30
30
31
- /** Default implementation of {@link PooledRSocket } stored in {@link RSocketPool} */
32
- final class DefaultPooledRSocket extends ResolvingOperator <RSocket >
33
- implements CoreSubscriber <RSocket >, PooledRSocket {
31
+ /** Default implementation of {@link WeightedRSocket } stored in {@link RSocketPool} */
32
+ final class PooledWeightedRSocket extends ResolvingOperator <RSocket >
33
+ implements CoreSubscriber <RSocket >, WeightedRSocket {
34
34
35
35
final RSocketPool parent ;
36
36
final LoadbalanceRSocketSource loadbalanceRSocketSource ;
37
37
final Stats stats ;
38
38
39
39
volatile Subscription s ;
40
40
41
- static final AtomicReferenceFieldUpdater <DefaultPooledRSocket , Subscription > S =
42
- AtomicReferenceFieldUpdater .newUpdater (DefaultPooledRSocket .class , Subscription .class , "s" );
41
+ static final AtomicReferenceFieldUpdater <PooledWeightedRSocket , Subscription > S =
42
+ AtomicReferenceFieldUpdater .newUpdater (PooledWeightedRSocket .class , Subscription .class , "s" );
43
43
44
- DefaultPooledRSocket (
44
+ PooledWeightedRSocket (
45
45
RSocketPool parent , LoadbalanceRSocketSource loadbalanceRSocketSource , Stats stats ) {
46
46
this .parent = parent ;
47
47
this .stats = stats ;
@@ -128,7 +128,7 @@ public void dispose() {
128
128
protected void doOnDispose () {
129
129
final RSocketPool parent = this .parent ;
130
130
for (; ; ) {
131
- final PooledRSocket [] sockets = parent .activeSockets ;
131
+ final WeightedRSocket [] sockets = parent .activeSockets ;
132
132
final int activeSocketsCount = sockets .length ;
133
133
134
134
int index = -1 ;
@@ -144,7 +144,7 @@ protected void doOnDispose() {
144
144
}
145
145
146
146
final int lastIndex = activeSocketsCount - 1 ;
147
- final PooledRSocket [] newSockets = new PooledRSocket [lastIndex ];
147
+ final WeightedRSocket [] newSockets = new WeightedRSocket [lastIndex ];
148
148
if (index != 0 ) {
149
149
System .arraycopy (sockets , 0 , newSockets , 0 , index );
150
150
}
@@ -196,8 +196,7 @@ public Stats stats() {
196
196
return stats ;
197
197
}
198
198
199
- @ Override
200
- public LoadbalanceRSocketSource source () {
199
+ LoadbalanceRSocketSource source () {
201
200
return loadbalanceRSocketSource ;
202
201
}
203
202
@@ -211,7 +210,7 @@ static final class RequestTrackingMonoInner<RESULT>
211
210
212
211
long startTime ;
213
212
214
- RequestTrackingMonoInner (DefaultPooledRSocket parent , Payload payload , FrameType requestType ) {
213
+ RequestTrackingMonoInner (PooledWeightedRSocket parent , Payload payload , FrameType requestType ) {
215
214
super (parent , payload , requestType );
216
215
}
217
216
@@ -245,7 +244,7 @@ public void accept(RSocket rSocket, Throwable t) {
245
244
return ;
246
245
}
247
246
248
- startTime = ((DefaultPooledRSocket ) parent ).stats .startRequest ();
247
+ startTime = ((PooledWeightedRSocket ) parent ).stats .startRequest ();
249
248
250
249
source .subscribe ((CoreSubscriber ) this );
251
250
} else {
@@ -257,7 +256,7 @@ public void accept(RSocket rSocket, Throwable t) {
257
256
public void onComplete () {
258
257
final long state = this .requested ;
259
258
if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
260
- final Stats stats = ((DefaultPooledRSocket ) parent ).stats ;
259
+ final Stats stats = ((PooledWeightedRSocket ) parent ).stats ;
261
260
final long now = stats .stopRequest (startTime );
262
261
stats .record (now - startTime );
263
262
super .onComplete ();
@@ -268,7 +267,7 @@ public void onComplete() {
268
267
public void onError (Throwable t ) {
269
268
final long state = this .requested ;
270
269
if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
271
- Stats stats = ((DefaultPooledRSocket ) parent ).stats ;
270
+ Stats stats = ((PooledWeightedRSocket ) parent ).stats ;
272
271
stats .stopRequest (startTime );
273
272
stats .recordError (0.0 );
274
273
super .onError (t );
@@ -284,7 +283,7 @@ public void cancel() {
284
283
285
284
if (state == STATE_SUBSCRIBED ) {
286
285
this .s .cancel ();
287
- ((DefaultPooledRSocket ) parent ).stats .stopRequest (startTime );
286
+ ((PooledWeightedRSocket ) parent ).stats .stopRequest (startTime );
288
287
} else {
289
288
this .parent .remove (this );
290
289
ReferenceCountUtil .safeRelease (this .payload );
@@ -296,7 +295,7 @@ static final class RequestTrackingFluxInner<INPUT>
296
295
extends FluxDeferredResolution <INPUT , RSocket > {
297
296
298
297
RequestTrackingFluxInner (
299
- DefaultPooledRSocket parent , INPUT fluxOrPayload , FrameType requestType ) {
298
+ PooledWeightedRSocket parent , INPUT fluxOrPayload , FrameType requestType ) {
300
299
super (parent , fluxOrPayload , requestType );
301
300
}
302
301
@@ -329,7 +328,7 @@ public void accept(RSocket rSocket, Throwable t) {
329
328
return ;
330
329
}
331
330
332
- ((DefaultPooledRSocket ) parent ).stats .startStream ();
331
+ ((PooledWeightedRSocket ) parent ).stats .startStream ();
333
332
334
333
source .subscribe (this );
335
334
} else {
@@ -341,7 +340,7 @@ public void accept(RSocket rSocket, Throwable t) {
341
340
public void onComplete () {
342
341
final long state = this .requested ;
343
342
if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
344
- ((DefaultPooledRSocket ) parent ).stats .stopStream ();
343
+ ((PooledWeightedRSocket ) parent ).stats .stopStream ();
345
344
super .onComplete ();
346
345
}
347
346
}
@@ -350,7 +349,7 @@ public void onComplete() {
350
349
public void onError (Throwable t ) {
351
350
final long state = this .requested ;
352
351
if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
353
- ((DefaultPooledRSocket ) parent ).stats .stopStream ();
352
+ ((PooledWeightedRSocket ) parent ).stats .stopStream ();
354
353
super .onError (t );
355
354
}
356
355
}
@@ -364,7 +363,7 @@ public void cancel() {
364
363
365
364
if (state == STATE_SUBSCRIBED ) {
366
365
this .s .cancel ();
367
- ((DefaultPooledRSocket ) parent ).stats .stopStream ();
366
+ ((PooledWeightedRSocket ) parent ).stats .stopStream ();
368
367
} else {
369
368
this .parent .remove (this );
370
369
if (requestType == FrameType .REQUEST_STREAM ) {
0 commit comments