Skip to content

Commit 7077ba4

Browse files
committed
changes LoadbalanceStrategy to accept List
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent ff9b02a commit 7077ba4

File tree

4 files changed

+135
-14
lines changed

4 files changed

+135
-14
lines changed

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
*/
1616
package io.rsocket.loadbalance;
1717

18+
import java.util.List;
1819
import java.util.function.Supplier;
1920

2021
@FunctionalInterface
2122
public interface LoadbalanceStrategy {
2223

23-
PooledRSocket select(PooledRSocket[] availableRSockets);
24+
PooledRSocket select(List<PooledRSocket> availableRSockets);
2425

2526
default Supplier<Stats> statsSupplier() {
2627
return Stats::noOps;

rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import io.rsocket.RSocket;
2121
import io.rsocket.frame.FrameType;
2222
import java.util.Arrays;
23+
import java.util.Collection;
2324
import java.util.HashMap;
25+
import java.util.Iterator;
2426
import java.util.List;
27+
import java.util.ListIterator;
2528
import java.util.concurrent.CancellationException;
2629
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2730
import java.util.function.Supplier;
@@ -34,7 +37,7 @@
3437
import reactor.util.annotation.Nullable;
3538

3639
class RSocketPool extends ResolvingOperator<Void>
37-
implements CoreSubscriber<List<LoadbalanceRSocketSource>> {
40+
implements CoreSubscriber<List<LoadbalanceRSocketSource>>, List<PooledRSocket> {
3841

3942
final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this);
4043
final LoadbalanceStrategy loadbalanceStrategy;
@@ -200,7 +203,33 @@ RSocket doSelect() {
200203
return null;
201204
}
202205

203-
return this.loadbalanceStrategy.select(sockets);
206+
return this.loadbalanceStrategy.select(this);
207+
}
208+
209+
@Override
210+
public PooledRSocket get(int index) {
211+
return activeSockets[index];
212+
}
213+
214+
@Override
215+
public int size() {
216+
return activeSockets.length;
217+
}
218+
219+
@Override
220+
public boolean isEmpty() {
221+
return activeSockets.length == 0;
222+
}
223+
224+
@Override
225+
public Object[] toArray() {
226+
return activeSockets;
227+
}
228+
229+
@Override
230+
@SuppressWarnings("unchecked")
231+
public <T> T[] toArray(T[] a) {
232+
return (T[]) activeSockets;
204233
}
205234

206235
static class DeferredResolutionRSocket implements RSocket {
@@ -325,4 +354,94 @@ public void accept(Void aVoid, Throwable t) {
325354
}
326355
}
327356
}
357+
358+
@Override
359+
public boolean contains(Object o) {
360+
throw new UnsupportedOperationException();
361+
}
362+
363+
@Override
364+
public Iterator<PooledRSocket> iterator() {
365+
throw new UnsupportedOperationException();
366+
}
367+
368+
@Override
369+
public boolean add(PooledRSocket pooledRSocket) {
370+
throw new UnsupportedOperationException();
371+
}
372+
373+
@Override
374+
public boolean remove(Object o) {
375+
throw new UnsupportedOperationException();
376+
}
377+
378+
@Override
379+
public boolean containsAll(Collection<?> c) {
380+
throw new UnsupportedOperationException();
381+
}
382+
383+
@Override
384+
public boolean addAll(Collection<? extends PooledRSocket> c) {
385+
throw new UnsupportedOperationException();
386+
}
387+
388+
@Override
389+
public boolean addAll(int index, Collection<? extends PooledRSocket> c) {
390+
throw new UnsupportedOperationException();
391+
}
392+
393+
@Override
394+
public boolean removeAll(Collection<?> c) {
395+
throw new UnsupportedOperationException();
396+
}
397+
398+
@Override
399+
public boolean retainAll(Collection<?> c) {
400+
throw new UnsupportedOperationException();
401+
}
402+
403+
@Override
404+
public void clear() {
405+
throw new UnsupportedOperationException();
406+
}
407+
408+
@Override
409+
public PooledRSocket set(int index, PooledRSocket element) {
410+
throw new UnsupportedOperationException();
411+
}
412+
413+
@Override
414+
public void add(int index, PooledRSocket element) {
415+
throw new UnsupportedOperationException();
416+
}
417+
418+
@Override
419+
public PooledRSocket remove(int index) {
420+
throw new UnsupportedOperationException();
421+
}
422+
423+
@Override
424+
public int indexOf(Object o) {
425+
throw new UnsupportedOperationException();
426+
}
427+
428+
@Override
429+
public int lastIndexOf(Object o) {
430+
throw new UnsupportedOperationException();
431+
}
432+
433+
@Override
434+
public ListIterator<PooledRSocket> listIterator() {
435+
throw new UnsupportedOperationException();
436+
}
437+
438+
@Override
439+
public ListIterator<PooledRSocket> listIterator(int index) {
440+
throw new UnsupportedOperationException();
441+
}
442+
443+
@Override
444+
public List<PooledRSocket> subList(int fromIndex, int toIndex) {
445+
throw new UnsupportedOperationException();
446+
}
328447
}

rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.rsocket.loadbalance;
1717

18+
import java.util.List;
1819
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
1920

2021
public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy {
@@ -25,12 +26,11 @@ public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy {
2526
AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadbalanceStrategy.class, "nextIndex");
2627

2728
@Override
28-
public PooledRSocket select(PooledRSocket[] sockets) {
29-
int length = sockets.length;
29+
public PooledRSocket select(List<PooledRSocket> sockets) {
30+
int length = sockets.size();
3031

3132
int indexToUse = Math.abs(NEXT_INDEX.getAndIncrement(this) % length);
3233

33-
final PooledRSocket pooledRSocket = sockets[indexToUse];
34-
return pooledRSocket;
34+
return sockets.get(indexToUse);
3535
}
3636
}

rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.loadbalance;
1818

19+
import java.util.List;
1920
import java.util.SplittableRandom;
2021
import java.util.concurrent.ThreadLocalRandom;
2122
import java.util.function.Supplier;
@@ -56,19 +57,19 @@ public Supplier<Stats> statsSupplier() {
5657
}
5758

5859
@Override
59-
public PooledRSocket select(PooledRSocket[] sockets) {
60+
public PooledRSocket select(List<PooledRSocket> sockets) {
6061
final int effort = this.effort;
61-
final int size = sockets.length;
62+
final int size = sockets.size();
6263

6364
PooledRSocket pooledRSocket;
6465
switch (size) {
6566
case 1:
66-
pooledRSocket = sockets[0];
67+
pooledRSocket = sockets.get(0);
6768
break;
6869
case 2:
6970
{
70-
PooledRSocket rsc1 = sockets[0];
71-
PooledRSocket rsc2 = sockets[1];
71+
PooledRSocket rsc1 = sockets.get(0);
72+
PooledRSocket rsc2 = sockets.get(1);
7273

7374
double w1 = algorithmicWeight(rsc1);
7475
double w2 = algorithmicWeight(rsc2);
@@ -91,8 +92,8 @@ public PooledRSocket select(PooledRSocket[] sockets) {
9192
if (i2 >= i1) {
9293
i2++;
9394
}
94-
rsc1 = sockets[i1];
95-
rsc2 = sockets[i2];
95+
rsc1 = sockets.get(i1);
96+
rsc2 = sockets.get(i2);
9697
if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) {
9798
break;
9899
}

0 commit comments

Comments
 (0)