Skip to content

Commit bfb5d1c

Browse files
committed
improves loadbalance test coverage and provides fixes
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent f202a2a commit bfb5d1c

File tree

5 files changed

+721
-181
lines changed

5 files changed

+721
-181
lines changed

rsocket-core/src/main/java/io/rsocket/loadbalance/MonoDeferredResolution.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ abstract class MonoDeferredResolution<RESULT, R> extends Mono<RESULT>
5959
}
6060

6161
@Override
62-
public void subscribe(CoreSubscriber<? super RESULT> actual) {
62+
public final void subscribe(CoreSubscriber<? super RESULT> actual) {
6363
if (this.requested == STATE_UNSUBSCRIBED
6464
&& REQUESTED.compareAndSet(this, STATE_UNSUBSCRIBED, STATE_SUBSCRIBER_SET)) {
6565

@@ -145,7 +145,7 @@ public final void onNext(RESULT payload) {
145145
}
146146

147147
@Override
148-
public void onError(Throwable t) {
148+
public final void onError(Throwable t) {
149149
if (this.done) {
150150
Operators.onErrorDropped(t, this.actual.currentContext());
151151
return;
@@ -156,7 +156,7 @@ public void onError(Throwable t) {
156156
}
157157

158158
@Override
159-
public void onComplete() {
159+
public final void onComplete() {
160160
if (this.done) {
161161
return;
162162
}
@@ -206,7 +206,7 @@ public final void request(long n) {
206206
}
207207
}
208208

209-
public void cancel() {
209+
public final void cancel() {
210210
long state = REQUESTED.getAndSet(this, STATE_TERMINATED);
211211
if (state == STATE_TERMINATED) {
212212
return;

rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -108,22 +108,16 @@ protected void doSubscribe() {
108108

109109
@Override
110110
protected void doOnValueResolved(RSocket value) {
111-
value.onClose().subscribe(null, t -> this.invalidate(), this::invalidate);
111+
value.onClose().subscribe(null, t -> this.doCleanup(), this::doCleanup);
112112
}
113113

114-
@Override
115-
protected void doOnValueExpired(RSocket value) {
116-
value.dispose();
117-
this.dispose();
118-
}
114+
void doCleanup() {
115+
if (isDisposed()) {
116+
return;
117+
}
119118

120-
@Override
121-
public void dispose() {
122-
super.dispose();
123-
}
119+
this.dispose();
124120

125-
@Override
126-
protected void doOnDispose() {
127121
final RSocketPool parent = this.parent;
128122
for (; ; ) {
129123
final PooledRSocket[] sockets = parent.activeSockets;
@@ -141,20 +135,35 @@ protected void doOnDispose() {
141135
break;
142136
}
143137

144-
final int lastIndex = activeSocketsCount - 1;
145-
final PooledRSocket[] newSockets = new PooledRSocket[lastIndex];
146-
if (index != 0) {
147-
System.arraycopy(sockets, 0, newSockets, 0, index);
148-
}
138+
final PooledRSocket[] newSockets;
139+
if (activeSocketsCount == 1) {
140+
newSockets = RSocketPool.EMPTY;
141+
} else {
142+
final int lastIndex = activeSocketsCount - 1;
143+
144+
newSockets = new PooledRSocket[lastIndex];
145+
if (index != 0) {
146+
System.arraycopy(sockets, 0, newSockets, 0, index);
147+
}
149148

150-
if (index != lastIndex) {
151-
System.arraycopy(sockets, index + 1, newSockets, index, lastIndex - index);
149+
if (index != lastIndex) {
150+
System.arraycopy(sockets, index + 1, newSockets, index, lastIndex - index);
151+
}
152152
}
153153

154154
if (RSocketPool.ACTIVE_SOCKETS.compareAndSet(parent, sockets, newSockets)) {
155155
break;
156156
}
157157
}
158+
}
159+
160+
@Override
161+
protected void doOnValueExpired(RSocket value) {
162+
value.dispose();
163+
}
164+
165+
@Override
166+
protected void doOnDispose() {
158167
Operators.terminate(S, this);
159168
}
160169

@@ -231,7 +240,7 @@ public void accept(RSocket rSocket, Throwable t) {
231240

232241
source.subscribe((CoreSubscriber) this);
233242
} else {
234-
parent.add(this);
243+
parent.observe(this);
235244
}
236245
}
237246
}
@@ -273,7 +282,7 @@ public void accept(RSocket rSocket, Throwable t) {
273282

274283
source.subscribe(this);
275284
} else {
276-
parent.add(this);
285+
parent.observe(this);
277286
}
278287
}
279288
}

0 commit comments

Comments
 (0)