Skip to content

Commit 60e9c28

Browse files
authored
Overlapping leases and redistribution on new connects (#188)
#### Problem This PR addresses two problems: - Typically users of `FairLeaseDistributor` would supply the same value for lease TTL and the period in which leases are distributed. In such cases, there would be a time period when new lease has not arrived and old lease is expired. This isn't a good reflection of server's intent as the server can handle the new requests but the lease has not yet reached the client. - New clients have to wait for the next distribution tick to get leases. #### Modified - Modified `FairLeaseDistributor` to increase user supplied lease TTL by 10% so that leases overlap. A server can still reject requests if it is overloaded. - Redistribute leases whenever a new client connects. #### Result Better leasing behavior as seen by the clients.
1 parent 9211630 commit 60e9c28

File tree

4 files changed

+119
-28
lines changed

4 files changed

+119
-28
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/lease/DefaultLeaseEnforcingSocket.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import io.reactivesocket.ReactiveSocket;
2020
import io.reactivesocket.reactivestreams.extensions.Px;
21+
import io.reactivesocket.exceptions.RejectedException;
22+
import io.reactivesocket.reactivestreams.extensions.Px;
2123
import io.reactivesocket.reactivestreams.extensions.internal.Cancellable;
2224
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
2325
import org.reactivestreams.Publisher;
@@ -30,11 +32,23 @@ public class DefaultLeaseEnforcingSocket extends DefaultLeaseHonoringSocket impl
3032
private final LeaseDistributor leaseDistributor;
3133
private volatile Consumer<Lease> leaseSender;
3234
private Cancellable distributorCancellation;
35+
@SuppressWarnings("rawtypes")
36+
private final Px rejectError;
3337

3438
public DefaultLeaseEnforcingSocket(ReactiveSocket delegate, LeaseDistributor leaseDistributor,
35-
LongSupplier currentTimeSupplier) {
39+
LongSupplier currentTimeSupplier, boolean clientHonorsLeases) {
3640
super(delegate, currentTimeSupplier);
3741
this.leaseDistributor = leaseDistributor;
42+
if (!clientHonorsLeases) {
43+
rejectError = Px.error(new RejectedException("Server overloaded."));
44+
} else {
45+
rejectError = null;
46+
}
47+
}
48+
49+
public DefaultLeaseEnforcingSocket(ReactiveSocket delegate, LeaseDistributor leaseDistributor,
50+
LongSupplier currentTimeSupplier) {
51+
this(delegate, leaseDistributor, currentTimeSupplier, true);
3852
}
3953

4054
public DefaultLeaseEnforcingSocket(ReactiveSocket delegate, LeaseDistributor leaseDistributor) {
@@ -66,6 +80,11 @@ public Publisher<Void> close() {
6680
});
6781
}
6882

83+
@Override
84+
protected <T> Publisher<T> rejectError() {
85+
return null == rejectError ? super.rejectError() : rejectError;
86+
}
87+
6988
/**
7089
* A distributor of leases for an instance of {@link LeaseEnforcingSocket}.
7190
*/

reactivesocket-core/src/main/java/io/reactivesocket/lease/DefaultLeaseHonoringSocket.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,25 @@
2121
import io.reactivesocket.exceptions.RejectedException;
2222
import io.reactivesocket.reactivestreams.extensions.Px;
2323
import org.reactivestreams.Publisher;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426

2527
import java.util.concurrent.atomic.AtomicInteger;
2628
import java.util.function.LongSupplier;
2729

2830
public class DefaultLeaseHonoringSocket implements LeaseHonoringSocket {
2931

32+
private static final Logger logger = LoggerFactory.getLogger(DefaultLeaseHonoringSocket.class);
33+
3034
private volatile Lease currentLease;
3135
private final ReactiveSocket delegate;
3236
private final LongSupplier currentTimeSupplier;
3337
private final AtomicInteger remainingQuota;
3438

3539
@SuppressWarnings("ThrowableInstanceNeverThrown")
3640
private static final RejectedException rejectedException = new RejectedException("Lease exhausted.");
41+
@SuppressWarnings("rawtypes")
42+
private static final Px rejectedPx = Px.error(rejectedException);
3743

3844
public DefaultLeaseHonoringSocket(ReactiveSocket delegate, LongSupplier currentTimeSupplier) {
3945
this.delegate = delegate;
@@ -55,7 +61,7 @@ public void accept(Lease lease) {
5561
public Publisher<Void> fireAndForget(Payload payload) {
5662
return Px.defer(() -> {
5763
if (!checkLease()) {
58-
return Px.error(rejectedException);
64+
return rejectError();
5965
}
6066
return delegate.fireAndForget(payload);
6167
});
@@ -65,7 +71,7 @@ public Publisher<Void> fireAndForget(Payload payload) {
6571
public Publisher<Payload> requestResponse(Payload payload) {
6672
return Px.defer(() -> {
6773
if (!checkLease()) {
68-
return Px.error(rejectedException);
74+
return rejectError();
6975
}
7076
return delegate.requestResponse(payload);
7177
});
@@ -75,7 +81,7 @@ public Publisher<Payload> requestResponse(Payload payload) {
7581
public Publisher<Payload> requestStream(Payload payload) {
7682
return Px.defer(() -> {
7783
if (!checkLease()) {
78-
return Px.error(rejectedException);
84+
return rejectError();
7985
}
8086
return delegate.requestStream(payload);
8187
});
@@ -85,7 +91,7 @@ public Publisher<Payload> requestStream(Payload payload) {
8591
public Publisher<Payload> requestSubscription(Payload payload) {
8692
return Px.defer(() -> {
8793
if (!checkLease()) {
88-
return Px.error(rejectedException);
94+
return rejectError();
8995
}
9096
return delegate.requestSubscription(payload);
9197
});
@@ -95,7 +101,7 @@ public Publisher<Payload> requestSubscription(Payload payload) {
95101
public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
96102
return Px.defer(() -> {
97103
if (!checkLease()) {
98-
return Px.error(rejectedException);
104+
return rejectError();
99105
}
100106
return delegate.requestChannel(payloads);
101107
});
@@ -105,7 +111,7 @@ public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
105111
public Publisher<Void> metadataPush(Payload payload) {
106112
return Px.defer(() -> {
107113
if (!checkLease()) {
108-
return Px.error(rejectedException);
114+
return rejectError();
109115
}
110116
return delegate.metadataPush(payload);
111117
});
@@ -126,7 +132,20 @@ public Publisher<Void> onClose() {
126132
return delegate.onClose();
127133
}
128134

135+
@SuppressWarnings("unchecked")
136+
protected <T> Publisher<T> rejectError() {
137+
return rejectedPx;
138+
}
139+
129140
private boolean checkLease() {
130-
return remainingQuota.getAndDecrement() > 0 && !currentLease.isExpired(currentTimeSupplier.getAsLong());
141+
boolean allow = remainingQuota.getAndDecrement() > 0 && !currentLease.isExpired(currentTimeSupplier.getAsLong());
142+
if (!allow) {
143+
if (logger.isDebugEnabled()) {
144+
logger.debug("Lease expired. Lease: " + currentLease + ", remaining quota: "
145+
+ Math.max(0, remainingQuota.get()) + ", current time (ms) "
146+
+ currentTimeSupplier.getAsLong());
147+
}
148+
}
149+
return allow;
131150
}
132151
}

reactivesocket-core/src/main/java/io/reactivesocket/lease/FairLeaseDistributor.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,31 @@ public final class FairLeaseDistributor implements DefaultLeaseEnforcingSocket.L
3838
private Subscription ticksSubscription;
3939
private volatile boolean startTicks;
4040
private final IntSupplier capacitySupplier;
41-
private final int leaseTTL;
41+
private final int leaseTTLMillis;
4242
private final Publisher<Long> leaseDistributionTicks;
43-
private volatile int remainingPermits;
43+
private final boolean redistributeOnConnect;
4444

45-
public FairLeaseDistributor(IntSupplier capacitySupplier, int leaseTTL, Publisher<Long> leaseDistributionTicks) {
45+
public FairLeaseDistributor(IntSupplier capacitySupplier, int leaseTTLMillis,
46+
Publisher<Long> leaseDistributionTicks, boolean redistributeOnConnect) {
4647
this.capacitySupplier = capacitySupplier;
47-
this.leaseTTL = leaseTTL;
48+
/*
49+
* If lease TTL is exactly the same as the period of replenishment, then there would be a time period when new
50+
* lease has not arrived and old lease is expired. This isn't a good reflection of server's intent as the server
51+
* can handle the new requests but the lease has not yet reached the client. So, having TTL slightly more
52+
* than distribution period (accomodating for network lag) is more representative of server's intent. OTOH, if
53+
* server isn't ready, it can always reject a request.
54+
*/
55+
this.leaseTTLMillis = (int) (leaseTTLMillis * 1.1);
4856
this.leaseDistributionTicks = leaseDistributionTicks;
57+
this.redistributeOnConnect = redistributeOnConnect;
4958
activeRecipients = new LinkedBlockingQueue<>();
5059
}
5160

61+
public FairLeaseDistributor(IntSupplier capacitySupplier, int leaseTTLMillis,
62+
Publisher<Long> leaseDistributionTicks) {
63+
this(capacitySupplier, leaseTTLMillis, leaseDistributionTicks, true);
64+
}
65+
5266
/**
5367
* Shutdown this distributor. No more leases will be provided to the registered sockets.
5468
*/
@@ -68,12 +82,23 @@ public void shutdown() {
6882
@Override
6983
public Cancellable registerSocket(Consumer<Lease> leaseConsumer) {
7084
activeRecipients.add(leaseConsumer);
85+
boolean _started;
7186
synchronized (this) {
87+
_started = startTicks;
7288
if (!startTicks) {
7389
startTicks();
7490
startTicks = true;
7591
}
7692
}
93+
94+
if (_started && redistributeOnConnect) {
95+
/*
96+
* This is a way to make sure that the clients that arrive in the middle of a distribution period, do not
97+
* have to wait for the next tick to arrive.
98+
*/
99+
distribute(capacitySupplier.getAsInt());
100+
}
101+
77102
return new CancellableImpl() {
78103
@Override
79104
protected void onCancel() {
@@ -86,20 +111,19 @@ private void distribute(int permits) {
86111
if (activeRecipients.isEmpty()) {
87112
return;
88113
}
89-
remainingPermits -= permits;
90114
int recipients = activeRecipients.size();
91115
int budget = permits / recipients;
92116

93117
// it would be more fair to randomized the distribution of extra
94118
int extra = permits - budget * recipients;
95-
Lease budgetLease = new LeaseImpl(budget, leaseTTL, Frame.NULL_BYTEBUFFER);
119+
Lease budgetLease = new LeaseImpl(budget, leaseTTLMillis, Frame.NULL_BYTEBUFFER);
96120
for (Consumer<Lease> recipient: activeRecipients) {
97121
Lease leaseToSend = budgetLease;
98122
int n = budget;
99123
if (extra > 0) {
100124
n += 1;
101125
extra -= 1;
102-
leaseToSend = new LeaseImpl(n, leaseTTL, Frame.NULL_BYTEBUFFER);
126+
leaseToSend = new LeaseImpl(n, leaseTTLMillis, Frame.NULL_BYTEBUFFER);
103127
}
104128
recipient.accept(leaseToSend);
105129
}
@@ -109,8 +133,7 @@ private void startTicks() {
109133
Px.from(leaseDistributionTicks)
110134
.doOnSubscribe(subscription -> ticksSubscription = subscription)
111135
.doOnNext(aLong -> {
112-
remainingPermits = capacitySupplier.getAsInt();
113-
distribute(remainingPermits);
136+
distribute(capacitySupplier.getAsInt());
114137
})
115138
.ignore()
116139
.subscribe();

reactivesocket-core/src/test/java/io/reactivesocket/lease/FairLeaseDistributorTest.java

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void testRegisterCancel() throws Exception {
4242
assertThat("Unexpected leases received.", rule.leases, hasSize(1));
4343
Lease lease = rule.leases.remove(0);
4444
assertThat("Unexpected permits", lease.getAllowedRequests(), is(rule.permits));
45-
assertThat("Unexpected ttl", lease.getTtl(), is(rule.ttl));
45+
rule.assertTTL(lease);
4646
cancel.cancel();
4747
rule.ticks.onNext(1L);
4848
assertThat("Unexpected leases received post cancellation.", rule.leases, is(empty()));
@@ -73,8 +73,25 @@ public void testTwoSocketsAndCancel() throws Exception {
7373
assertThat("Unexpected leases received.", rule.leases, hasSize(1));
7474
}
7575

76+
@Test(timeout = 10000)
77+
public void testRedistribute() throws Exception {
78+
rule.permits = 2;
79+
rule.redistributeLeasesOnConnect();
80+
81+
Cancellable cancel1 = rule.distributor.registerSocket(rule);
82+
rule.distributor.registerSocket(rule);
83+
84+
assertThat("Unexpected leases received.", rule.leases, hasSize(2));
85+
rule.assertLease(rule.permits/2);
86+
rule.assertLease(rule.permits/2);
87+
cancel1.cancel();
88+
rule.ticks.onNext(1L);
89+
assertThat("Unexpected leases received.", rule.leases, hasSize(1));
90+
}
91+
7692
public static class DistributorRule extends ExternalResource implements Consumer<Lease> {
7793

94+
private boolean redistributeOnConnect;
7895
private FairLeaseDistributor distributor;
7996
private int permits;
8097
private int ttl;
@@ -86,20 +103,29 @@ public Statement apply(final Statement base, Description description) {
86103
return new Statement() {
87104
@Override
88105
public void evaluate() throws Throwable {
89-
ticks = PublishProcessor.create();
90-
if (0 == permits) {
91-
permits = 1;
92-
}
93-
if (0 == ttl) {
94-
ttl = 10;
95-
}
96-
distributor = new FairLeaseDistributor(() -> permits, ttl, ticks);
97-
leases = new CopyOnWriteArrayList<>();
106+
init();
98107
base.evaluate();
99108
}
100109
};
101110
}
102111

112+
protected void init() {
113+
ticks = PublishProcessor.create();
114+
if (0 == permits) {
115+
permits = 1;
116+
}
117+
if (0 == ttl) {
118+
ttl = 10;
119+
}
120+
distributor = new FairLeaseDistributor(() -> permits, ttl, ticks, redistributeOnConnect);
121+
leases = new CopyOnWriteArrayList<>();
122+
}
123+
124+
public void redistributeLeasesOnConnect() {
125+
redistributeOnConnect = true;
126+
init();
127+
}
128+
103129
@Override
104130
public void accept(Lease lease) {
105131
leases.add(lease);
@@ -108,7 +134,11 @@ public void accept(Lease lease) {
108134
public void assertLease(int expectedPermits) {
109135
Lease lease = leases.remove(0);
110136
assertThat("Unexpected permits", lease.getAllowedRequests(), is(expectedPermits));
111-
assertThat("Unexpected ttl", lease.getTtl(), is(ttl));
137+
assertTTL(lease);
138+
}
139+
140+
protected void assertTTL(Lease lease) {
141+
assertThat("Unexpected ttl", lease.getTtl(), is((int)(ttl * 1.1)));
112142
}
113143
}
114144
}

0 commit comments

Comments
 (0)