Skip to content

Commit 5bb6cea

Browse files
committed
Do not send lease frames for clients which do not honor leases.
Today server sends leases even though the client does not honor leases. There are two things here: - A server should protect itself, even if the client dishonors leases, i.e. leases aren't infinite if the client does not honor. So, the server should still expect requests within capacity from clients and reject all other requests. - Clients should not receive lease frames as they are not relevant. Modified `ServerReactiveSocket` to not send leases when the setup is such that client does not honor leases. Also, added an override in `SetupProvider` to be able to modify the `DisableLeaseSocket` if required. Expected results and optimizations on the wire w.r.t data transfers.
1 parent 9818104 commit 5bb6cea

File tree

9 files changed

+168
-26
lines changed

9 files changed

+168
-26
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/ServerReactiveSocket.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ public class ServerReactiveSocket implements ReactiveSocket {
5252

5353
private final ReactiveSocket requestHandler;
5454
private Subscription receiversSubscription;
55-
5655
public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler,
57-
Consumer<Throwable> errorConsumer) {
56+
boolean clientHonorsLease, Consumer<Throwable> errorConsumer) {
5857
this.requestHandler = requestHandler;
5958
this.connection = connection;
6059
serverInput = connection.receive();
@@ -64,14 +63,22 @@ public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestH
6463
if (requestHandler instanceof LeaseEnforcingSocket) {
6564
LeaseEnforcingSocket enforcer = (LeaseEnforcingSocket) requestHandler;
6665
enforcer.acceptLeaseSender(lease -> {
66+
if (!clientHonorsLease) {
67+
return;
68+
}
6769
Frame leaseFrame = Lease.from(lease.getTtl(), lease.getAllowedRequests(), lease.metadata());
6870
Px.from(connection.sendOne(leaseFrame))
69-
.doOnError(errorConsumer)
70-
.subscribe();
71+
.doOnError(errorConsumer)
72+
.subscribe();
7173
});
7274
}
7375
}
7476

77+
public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler,
78+
Consumer<Throwable> errorConsumer) {
79+
this(connection, requestHandler, true, errorConsumer);
80+
}
81+
7582
@Override
7683
public Publisher<Void> fireAndForget(Payload payload) {
7784
return requestHandler.fireAndForget(payload);

reactivesocket-core/src/main/java/io/reactivesocket/client/SetupProvider.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivesocket.DuplexConnection;
2323
import io.reactivesocket.Frame;
2424
import io.reactivesocket.Frame.Setup;
25+
import io.reactivesocket.lease.DisableLeaseSocket;
2526
import io.reactivesocket.lease.LeaseHonoringSocket;
2627
import io.reactivesocket.ReactiveSocket;
2728
import io.reactivesocket.frame.SetupFrameFlyweight;
@@ -84,6 +85,15 @@ public interface SetupProvider {
8485
*/
8586
SetupProvider disableLease();
8687

88+
/**
89+
* Creates a new {@code SetupProvider} that does not honor leases.
90+
*
91+
* @param socketFactory A factory to create {@link DisableLeaseSocket} for each accepted socket.
92+
*
93+
* @return A new {@code SetupProvider} instance.
94+
*/
95+
SetupProvider disableLease(Function<ReactiveSocket, DisableLeaseSocket> socketFactory);
96+
8797
/**
8898
* Creates a new {@code SetupProvider} that uses the passed {@code setupPayload} as the payload for the setup frame.
8999
* Default instances, do not have any payload.

reactivesocket-core/src/main/java/io/reactivesocket/client/SetupProviderImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@
4242
final class SetupProviderImpl implements SetupProvider {
4343

4444
private final Frame setupFrame;
45-
private final Function<ReactiveSocket, LeaseHonoringSocket> leaseDecorator;
45+
private final Function<ReactiveSocket, ? extends LeaseHonoringSocket> leaseDecorator;
4646
private final Consumer<Throwable> errorConsumer;
4747
private final KeepAliveProvider keepAliveProvider;
4848

49-
SetupProviderImpl(Frame setupFrame, Function<ReactiveSocket, LeaseHonoringSocket> leaseDecorator,
49+
SetupProviderImpl(Frame setupFrame, Function<ReactiveSocket, ? extends LeaseHonoringSocket> leaseDecorator,
5050
KeepAliveProvider keepAliveProvider, Consumer<Throwable> errorConsumer) {
5151
this.keepAliveProvider = keepAliveProvider;
5252
this.errorConsumer = errorConsumer;
@@ -98,12 +98,16 @@ public SetupProvider honorLease(Function<ReactiveSocket, LeaseHonoringSocket> le
9898

9999
@Override
100100
public SetupProvider disableLease() {
101+
return disableLease(DisableLeaseSocket::new);
102+
}
103+
104+
@Override
105+
public SetupProvider disableLease(Function<ReactiveSocket, DisableLeaseSocket> socketFactory) {
101106
Frame newSetup = from(getFlags(setupFrame) & ~ConnectionSetupPayload.HONOR_LEASE,
102107
keepaliveInterval(setupFrame), maxLifetime(setupFrame),
103108
Frame.Setup.metadataMimeType(setupFrame), Frame.Setup.dataMimeType(setupFrame),
104109
setupFrame);
105-
return new SetupProviderImpl(newSetup, reactiveSocket -> new DisableLeaseSocket(reactiveSocket),
106-
keepAliveProvider, errorConsumer);
110+
return new SetupProviderImpl(newSetup, socketFactory, keepAliveProvider, errorConsumer);
107111
}
108112

109113
@Override

reactivesocket-core/src/main/java/io/reactivesocket/lease/DisableLeaseSocket.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,25 @@
1919
import io.reactivesocket.Payload;
2020
import io.reactivesocket.ReactiveSocket;
2121
import org.reactivestreams.Publisher;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2224

2325
/**
2426
* {@link LeaseHonoringSocket} that does not expect to receive any leases and {@link #accept(Lease)} throws an error.
2527
*/
2628
public class DisableLeaseSocket implements LeaseHonoringSocket {
2729

30+
private static final Logger logger = LoggerFactory.getLogger(DisableLeaseSocket.class);
31+
2832
private final ReactiveSocket delegate;
2933

3034
public DisableLeaseSocket(ReactiveSocket delegate) {
3135
this.delegate = delegate;
3236
}
3337

34-
/**
35-
* @throws IllegalArgumentException Always thrown.
36-
*/
3738
@Override
3839
public void accept(Lease lease) {
39-
throw new IllegalArgumentException("Leases are disabled.");
40+
logger.info("Leases are disabled but received a lease from the peer. " + lease);
4041
}
4142

4243
@Override

reactivesocket-core/src/main/java/io/reactivesocket/lease/LeaseImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,13 @@ public long expiry() {
5757
public ByteBuffer metadata() {
5858
return metadata;
5959
}
60+
61+
@Override
62+
public String toString() {
63+
return "LeaseImpl{" +
64+
"allowedRequests=" + allowedRequests +
65+
", ttl=" + ttl +
66+
", expiry=" + expiry +
67+
'}';
68+
}
6069
}

reactivesocket-core/src/main/java/io/reactivesocket/server/ReactiveSocketServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ static ReactiveSocketServer create(TransportServer transportServer) {
5353
KeepAliveProvider.never());
5454
LeaseEnforcingSocket handler = acceptor.accept(setupPayload, sender);
5555
ServerReactiveSocket receiver = new ServerReactiveSocket(duplexConnection, handler,
56+
setupPayload.willClientHonorLease(),
5657
Throwable::printStackTrace);
5758
receiver.start();
5859
return duplexConnection.onClose();

reactivesocket-transport-local/src/main/java/io/reactivesocket/local/internal/PeerConnector.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ public static PeerConnector connect(String name, int id) {
6565
return new PeerConnector(uniqueName);
6666
}
6767

68-
private final class LocalDuplexConnection implements DuplexConnection, Consumer<Frame> {
68+
private final class LocalDuplexConnection implements DuplexConnection {
6969

7070
private volatile ValidatingSubscription<Frame> receiver;
7171
private volatile boolean connected;
7272
private final EmptySubject closeNotifier;
7373
private final boolean client;
74-
private volatile Consumer<Frame> peer;
74+
private volatile LocalDuplexConnection peer;
7575

7676
private LocalDuplexConnection(EmptySubject closeNotifier, boolean client) {
7777
this.closeNotifier = closeNotifier;
@@ -91,7 +91,7 @@ public Publisher<Void> send(Publisher<Frame> frames) {
9191
subscription.request(Long.MAX_VALUE); // Local transport is not flow controlled.
9292
}, frame -> {
9393
if (peer != null) {
94-
peer.accept(frame);
94+
peer.receiveFrameFromPeer(frame);
9595
} else {
9696
logger.warn("Sending a frame but peer not connected. Ignoring frame: " + frame);
9797
}
@@ -145,8 +145,7 @@ public String toString() {
145145
return "[local connection(" + (client ? "client" : "server" + ") - ") + name + "] connected: " + connected;
146146
}
147147

148-
@Override
149-
public void accept(Frame frame) {
148+
public void receiveFrameFromPeer(Frame frame) {
150149
if (receiver != null) {
151150
receiver.safeOnNext(frame);
152151
} else {
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket;
15+
16+
import io.reactivesocket.client.ReactiveSocketClient;
17+
import io.reactivesocket.lease.DefaultLeaseEnforcingSocket;
18+
import io.reactivesocket.lease.DefaultLeaseEnforcingSocket.LeaseDistributor;
19+
import io.reactivesocket.lease.DisableLeaseSocket;
20+
import io.reactivesocket.lease.Lease;
21+
import io.reactivesocket.lease.LeaseImpl;
22+
import io.reactivesocket.local.LocalSendReceiveTest.LocalRule;
23+
import io.reactivesocket.reactivestreams.extensions.internal.CancellableImpl;
24+
import io.reactivesocket.server.ReactiveSocketServer;
25+
import io.reactivesocket.util.PayloadImpl;
26+
import io.reactivex.Single;
27+
import io.reactivex.subscribers.TestSubscriber;
28+
import org.junit.Rule;
29+
import org.junit.Test;
30+
import org.junit.runner.Description;
31+
import org.junit.runners.model.Statement;
32+
import org.mockito.ArgumentCaptor;
33+
import org.mockito.Mockito;
34+
35+
import java.util.List;
36+
import java.util.concurrent.CopyOnWriteArrayList;
37+
import java.util.function.Consumer;
38+
39+
import static io.reactivesocket.client.KeepAliveProvider.*;
40+
import static io.reactivesocket.client.SetupProvider.*;
41+
import static org.hamcrest.MatcherAssert.assertThat;
42+
import static org.hamcrest.Matchers.*;
43+
import static org.mockito.Matchers.*;
44+
45+
public class ClientDishonorLeaseTest {
46+
47+
@Rule
48+
public final LocalRSRule rule = new LocalRSRule();
49+
50+
@Test(timeout = 10000)
51+
public void testNoLeasesSentToClient() throws Exception {
52+
ReactiveSocket socket = rule.connectSocket();
53+
rule.sendLease();
54+
55+
TestSubscriber<Payload> s = TestSubscriber.create();
56+
socket.requestResponse(PayloadImpl.EMPTY).subscribe(s);
57+
s.awaitTerminalEvent();
58+
59+
assertThat("Unexpected leases received by the client.", rule.leases, is(empty()));
60+
}
61+
62+
public static class LocalRSRule extends LocalRule {
63+
64+
private ReactiveSocketServer socketServer;
65+
private ReactiveSocketClient socketClient;
66+
private LeaseDistributor leaseDistributorMock;
67+
private List<Lease> leases;
68+
69+
@Override
70+
public Statement apply(final Statement base, Description description) {
71+
return new Statement() {
72+
@Override
73+
public void evaluate() throws Throwable {
74+
leases = new CopyOnWriteArrayList<>();
75+
leaseDistributorMock = Mockito.mock(LeaseDistributor.class);
76+
Mockito.when(leaseDistributorMock.registerSocket(any())).thenReturn(new CancellableImpl());
77+
init();
78+
socketServer = ReactiveSocketServer.create(localServer);
79+
socketServer.start((setup, sendingSocket) -> {
80+
return new DefaultLeaseEnforcingSocket(new AbstractReactiveSocket() { }, leaseDistributorMock);
81+
});
82+
socketClient = ReactiveSocketClient.create(localClient, keepAlive(never())
83+
.disableLease(reactiveSocket -> new DisableLeaseSocket(reactiveSocket) {
84+
@Override
85+
public void accept(Lease lease) {
86+
leases.add(lease);
87+
}
88+
}));
89+
base.evaluate();
90+
}
91+
};
92+
}
93+
94+
public ReactiveSocket connectSocket() {
95+
return Single.fromPublisher(socketClient.connect()).blockingGet();
96+
}
97+
98+
@SuppressWarnings({"rawtypes", "unchecked"})
99+
public Consumer<Lease> sendLease() {
100+
ArgumentCaptor<Consumer> leaseConsumerCaptor = ArgumentCaptor.forClass(Consumer.class);
101+
Mockito.verify(leaseDistributorMock).registerSocket(leaseConsumerCaptor.capture());
102+
Consumer<Lease> leaseConsumer = leaseConsumerCaptor.getValue();
103+
leaseConsumer.accept(new LeaseImpl(1, 1, Frame.NULL_BYTEBUFFER));
104+
return leaseConsumer;
105+
}
106+
}
107+
}

reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalSendReceiveTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,26 +67,30 @@ public void testClose() throws Exception {
6767

6868
public static class LocalRule extends ExternalResource {
6969

70-
private LocalClient localClient;
71-
private String name;
70+
protected String name;
71+
protected LocalServer localServer;
72+
protected LocalClient localClient;
7273

7374
@Override
7475
public Statement apply(final Statement base, Description description) {
7576
return new Statement() {
7677
@Override
7778
public void evaluate() throws Throwable {
78-
name = "test-send-receive-server-" + ThreadLocalRandom.current().nextInt();
79-
LocalServer.create(name)
80-
.start(duplexConnection -> {
81-
return duplexConnection.send(duplexConnection.receive());
82-
});
83-
84-
localClient = LocalClient.create(name);
79+
init();
80+
localServer.start(duplexConnection -> {
81+
return duplexConnection.send(duplexConnection.receive());
82+
});
8583
base.evaluate();
8684
}
8785
};
8886
}
8987

88+
protected void init() {
89+
name = "test-send-receive-server-" + ThreadLocalRandom.current().nextInt();
90+
localServer = LocalServer.create(name);
91+
localClient = LocalClient.create(name);
92+
}
93+
9094
public DuplexConnection connect() {
9195
return Single.fromPublisher(localClient.connect()).blockingGet();
9296
}

0 commit comments

Comments
 (0)