28
28
import reactor .core .publisher .Operators ;
29
29
import reactor .util .context .Context ;
30
30
31
- /** Specific interface for all RSocket store in {@link RSocketPool} */
32
- final class ResolvingPooledRSocket extends ResolvingOperator <RSocket >
31
+ /** Default implementation of {@link PooledRSocket} stored in {@link RSocketPool} */
32
+ final class DefaultPooledRSocket extends ResolvingOperator <RSocket >
33
33
implements CoreSubscriber <RSocket >, PooledRSocket {
34
34
35
- final LoadbalanceTarget loadbalanceTarget ;
35
+ final RSocketPool parent ;
36
+ final LoadbalanceRSocketSource loadbalanceRSocketSource ;
36
37
final Stats stats ;
37
38
38
39
volatile Subscription s ;
39
40
40
- static final AtomicReferenceFieldUpdater <ResolvingPooledRSocket , Subscription > S =
41
- AtomicReferenceFieldUpdater .newUpdater (ResolvingPooledRSocket .class , Subscription .class , "s" );
41
+ static final AtomicReferenceFieldUpdater <DefaultPooledRSocket , Subscription > S =
42
+ AtomicReferenceFieldUpdater .newUpdater (DefaultPooledRSocket .class , Subscription .class , "s" );
42
43
43
- ResolvingPooledRSocket (LoadbalanceTarget loadbalanceTarget , Stats stats ) {
44
+ DefaultPooledRSocket (
45
+ RSocketPool parent , LoadbalanceRSocketSource loadbalanceRSocketSource , Stats stats ) {
46
+ this .parent = parent ;
44
47
this .stats = stats ;
45
- this .loadbalanceTarget = loadbalanceTarget ;
48
+ this .loadbalanceRSocketSource = loadbalanceRSocketSource ;
46
49
}
47
50
48
51
@ Override
@@ -100,21 +103,61 @@ public void onNext(RSocket value) {
100
103
101
104
@ Override
102
105
protected void doSubscribe () {
103
- this .loadbalanceTarget .source ().subscribe (this );
106
+ this .loadbalanceRSocketSource .source ().subscribe (this );
104
107
}
105
108
106
109
@ Override
107
110
protected void doOnValueResolved (RSocket value ) {
111
+ stats .setAvailability (1.0 );
108
112
value .onClose ().subscribe (null , t -> this .invalidate (), this ::invalidate );
109
113
}
110
114
111
115
@ Override
112
116
protected void doOnValueExpired (RSocket value ) {
117
+ stats .setAvailability (0.0 );
113
118
value .dispose ();
119
+ this .dispose ();
120
+ }
121
+
122
+ @ Override
123
+ public void dispose () {
124
+ super .dispose ();
114
125
}
115
126
116
127
@ Override
117
128
protected void doOnDispose () {
129
+ final RSocketPool parent = this .parent ;
130
+ for (; ; ) {
131
+ final PooledRSocket [] sockets = parent .activeSockets ;
132
+ final int activeSocketsCount = sockets .length ;
133
+
134
+ int index = -1 ;
135
+ for (int i = 0 ; i < activeSocketsCount ; i ++) {
136
+ if (sockets [i ] == this ) {
137
+ index = i ;
138
+ break ;
139
+ }
140
+ }
141
+
142
+ if (index == -1 ) {
143
+ break ;
144
+ }
145
+
146
+ final int lastIndex = activeSocketsCount - 1 ;
147
+ final PooledRSocket [] newSockets = new PooledRSocket [lastIndex ];
148
+ if (index != 0 ) {
149
+ System .arraycopy (sockets , 0 , newSockets , 0 , index );
150
+ }
151
+
152
+ if (index != lastIndex ) {
153
+ System .arraycopy (sockets , index + 1 , newSockets , index , lastIndex - index );
154
+ }
155
+
156
+ if (RSocketPool .ACTIVE_SOCKETS .compareAndSet (parent , sockets , newSockets )) {
157
+ break ;
158
+ }
159
+ }
160
+ stats .setAvailability (0.0 );
118
161
Operators .terminate (S , this );
119
162
}
120
163
@@ -154,47 +197,21 @@ public Stats stats() {
154
197
}
155
198
156
199
@ Override
157
- public LoadbalanceTarget supplier () {
158
- return loadbalanceTarget ;
200
+ public LoadbalanceRSocketSource source () {
201
+ return loadbalanceRSocketSource ;
159
202
}
160
203
161
204
@ Override
162
205
public double availability () {
163
206
return stats .availability ();
164
207
}
165
208
166
- /**
167
- * Try to dispose this instance if possible. Otherwise, if there is ongoing requests, mark this as
168
- * pending for removal and dispose once all the requests are terminated.<br>
169
- * This operation may be cancelled if {@link #markActive()} is invoked prior this instance has
170
- * been disposed
171
- *
172
- * @return {@code true} if this instance was disposed
173
- */
174
- @ Override
175
- public boolean markForRemoval () {
176
- // FIXME: provide real logic here
177
- this .dispose ();
178
- return true ;
179
- }
180
-
181
- /**
182
- * Try to restore state of this RSocket to be active after marking as pending removal again.
183
- *
184
- * @return {@code true} if marked as active. Otherwise, should be treated as it was disposed.
185
- */
186
- @ Override
187
- public boolean markActive () {
188
- return false ;
189
- }
190
-
191
209
static final class RequestTrackingMonoInner <RESULT >
192
210
extends MonoDeferredResolution <RESULT , RSocket > {
193
211
194
212
long startTime ;
195
213
196
- RequestTrackingMonoInner (
197
- ResolvingPooledRSocket parent , Payload payload , FrameType requestType ) {
214
+ RequestTrackingMonoInner (DefaultPooledRSocket parent , Payload payload , FrameType requestType ) {
198
215
super (parent , payload , requestType );
199
216
}
200
217
@@ -228,7 +245,7 @@ public void accept(RSocket rSocket, Throwable t) {
228
245
return ;
229
246
}
230
247
231
- startTime = ((ResolvingPooledRSocket ) parent ).stats .startRequest ();
248
+ startTime = ((DefaultPooledRSocket ) parent ).stats .startRequest ();
232
249
233
250
source .subscribe ((CoreSubscriber ) this );
234
251
} else {
@@ -240,7 +257,7 @@ public void accept(RSocket rSocket, Throwable t) {
240
257
public void onComplete () {
241
258
final long state = this .requested ;
242
259
if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
243
- final Stats stats = ((ResolvingPooledRSocket ) parent ).stats ;
260
+ final Stats stats = ((DefaultPooledRSocket ) parent ).stats ;
244
261
final long now = stats .stopRequest (startTime );
245
262
stats .record (now - startTime );
246
263
super .onComplete ();
@@ -251,7 +268,7 @@ public void onComplete() {
251
268
public void onError (Throwable t ) {
252
269
final long state = this .requested ;
253
270
if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
254
- Stats stats = ((ResolvingPooledRSocket ) parent ).stats ;
271
+ Stats stats = ((DefaultPooledRSocket ) parent ).stats ;
255
272
stats .stopRequest (startTime );
256
273
stats .recordError (0.0 );
257
274
super .onError (t );
@@ -267,7 +284,7 @@ public void cancel() {
267
284
268
285
if (state == STATE_SUBSCRIBED ) {
269
286
this .s .cancel ();
270
- ((ResolvingPooledRSocket ) parent ).stats .stopRequest (startTime );
287
+ ((DefaultPooledRSocket ) parent ).stats .stopRequest (startTime );
271
288
} else {
272
289
this .parent .remove (this );
273
290
ReferenceCountUtil .safeRelease (this .payload );
@@ -279,7 +296,7 @@ static final class RequestTrackingFluxInner<INPUT>
279
296
extends FluxDeferredResolution <INPUT , RSocket > {
280
297
281
298
RequestTrackingFluxInner (
282
- ResolvingPooledRSocket parent , INPUT fluxOrPayload , FrameType requestType ) {
299
+ DefaultPooledRSocket parent , INPUT fluxOrPayload , FrameType requestType ) {
283
300
super (parent , fluxOrPayload , requestType );
284
301
}
285
302
@@ -312,7 +329,7 @@ public void accept(RSocket rSocket, Throwable t) {
312
329
return ;
313
330
}
314
331
315
- ((ResolvingPooledRSocket ) parent ).stats .startStream ();
332
+ ((DefaultPooledRSocket ) parent ).stats .startStream ();
316
333
317
334
source .subscribe (this );
318
335
} else {
@@ -324,7 +341,7 @@ public void accept(RSocket rSocket, Throwable t) {
324
341
public void onComplete () {
325
342
final long state = this .requested ;
326
343
if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
327
- ((ResolvingPooledRSocket ) parent ).stats .stopStream ();
344
+ ((DefaultPooledRSocket ) parent ).stats .stopStream ();
328
345
super .onComplete ();
329
346
}
330
347
}
@@ -333,7 +350,7 @@ public void onComplete() {
333
350
public void onError (Throwable t ) {
334
351
final long state = this .requested ;
335
352
if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
336
- ((ResolvingPooledRSocket ) parent ).stats .stopStream ();
353
+ ((DefaultPooledRSocket ) parent ).stats .stopStream ();
337
354
super .onError (t );
338
355
}
339
356
}
@@ -347,7 +364,7 @@ public void cancel() {
347
364
348
365
if (state == STATE_SUBSCRIBED ) {
349
366
this .s .cancel ();
350
- ((ResolvingPooledRSocket ) parent ).stats .stopStream ();
367
+ ((DefaultPooledRSocket ) parent ).stats .stopStream ();
351
368
} else {
352
369
this .parent .remove (this );
353
370
if (requestType == FrameType .REQUEST_STREAM ) {
0 commit comments