Skip to content

Commit e69ae87

Browse files
committed
AbstractReactiveSocket.onClose() never completes.
#### Problem `AbstractReactiveSocket` does not implement `close()` and `onClose()` correctly. Both of them return `Px.never()`. This makes it hard for the implementation to do any cleanup actions on close. #### Modification Correctly terminate `onClose` `Publisher` after close() is invoked and subscribed. Also, added test to verify this behavior in the local transport. #### Result Better way to cleanup on close of `AbstractReactiveSocket`.
1 parent d712f43 commit e69ae87

File tree

5 files changed

+185
-69
lines changed

5 files changed

+185
-69
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/AbstractReactiveSocket.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.reactivesocket;
1818

19+
import io.reactivesocket.reactivestreams.extensions.internal.EmptySubject;
1920
import org.reactivestreams.Publisher;
2021
import io.reactivesocket.reactivestreams.extensions.Px;
2122

@@ -27,6 +28,8 @@
2728
*/
2829
public abstract class AbstractReactiveSocket implements ReactiveSocket {
2930

31+
private final EmptySubject onClose = new EmptySubject();
32+
3033
@Override
3134
public Publisher<Void> fireAndForget(Payload payload) {
3235
return Px.error(new UnsupportedOperationException("Fire and forget not implemented."));
@@ -59,11 +62,14 @@ public Publisher<Void> metadataPush(Payload payload) {
5962

6063
@Override
6164
public Publisher<Void> close() {
62-
return Px.never();
65+
return s -> {
66+
onClose.onComplete();
67+
onClose.subscribe(s);
68+
};
6369
}
6470

6571
@Override
6672
public Publisher<Void> onClose() {
67-
return Px.never();
73+
return onClose;
6874
}
6975
}

reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import io.reactivesocket.DuplexConnection;
2020
import io.reactivesocket.local.internal.PeerConnector;
21+
import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber;
2122
import io.reactivesocket.reactivestreams.extensions.Px;
2223
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
2324
import io.reactivesocket.transport.TransportServer;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

2728
import java.net.SocketAddress;
29+
import java.util.concurrent.ConcurrentLinkedQueue;
2830
import java.util.concurrent.CountDownLatch;
2931
import java.util.concurrent.TimeUnit;
3032

@@ -37,6 +39,10 @@ public final class LocalServer implements TransportServer {
3739

3840
private final String name;
3941
private volatile StartedImpl started;
42+
/**
43+
* Active connections, to close when server is shutdown.
44+
*/
45+
private final ConcurrentLinkedQueue<DuplexConnection> activeConnections = new ConcurrentLinkedQueue<>();
4046

4147
private LocalServer(String name) {
4248
this.name = name;
@@ -76,6 +82,8 @@ void accept(PeerConnector peerConnector) {
7682
}
7783

7884
DuplexConnection serverConn = peerConnector.forServer();
85+
activeConnections.add(serverConn);
86+
serverConn.onClose().subscribe(Subscribers.doOnTerminate(() -> activeConnections.remove(serverConn)));
7987
Px.from(started.acceptor.apply(serverConn))
8088
.subscribe(Subscribers.cleanup(() -> {
8189
serverConn.close().subscribe(Subscribers.empty());
@@ -140,6 +148,9 @@ public void awaitShutdown(long duration, TimeUnit durationUnit) {
140148
@Override
141149
public void shutdown() {
142150
shutdownLatch.countDown();
151+
for (DuplexConnection activeConnection : activeConnections) {
152+
activeConnection.close().subscribe(DefaultSubscriber.defaultInstance());
153+
}
143154
LocalPeersManager.unregister(name);
144155
}
145156
}

reactivesocket-transport-local/src/test/java/io/reactivesocket/ClientDishonorLeaseTest.java

Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,14 @@
1313

1414
package io.reactivesocket;
1515

16-
import io.reactivesocket.client.ReactiveSocketClient;
17-
import io.reactivesocket.lease.DefaultLeaseEnforcingSocket;
18-
import io.reactivesocket.lease.DefaultLeaseEnforcingSocket.LeaseDistributor;
19-
import io.reactivesocket.lease.DisableLeaseSocket;
20-
import io.reactivesocket.lease.Lease;
21-
import io.reactivesocket.lease.LeaseImpl;
22-
import io.reactivesocket.local.LocalSendReceiveTest.LocalRule;
23-
import io.reactivesocket.reactivestreams.extensions.internal.CancellableImpl;
24-
import io.reactivesocket.server.ReactiveSocketServer;
16+
import io.reactivesocket.test.util.LocalRSRule;
2517
import io.reactivesocket.util.PayloadImpl;
26-
import io.reactivex.Single;
2718
import io.reactivex.subscribers.TestSubscriber;
2819
import org.junit.Rule;
2920
import org.junit.Test;
30-
import org.junit.runner.Description;
31-
import org.junit.runners.model.Statement;
32-
import org.mockito.ArgumentCaptor;
33-
import org.mockito.Mockito;
3421

35-
import java.util.List;
36-
import java.util.concurrent.CopyOnWriteArrayList;
37-
import java.util.function.Consumer;
38-
39-
import static io.reactivesocket.client.KeepAliveProvider.*;
40-
import static io.reactivesocket.client.SetupProvider.*;
4122
import static org.hamcrest.MatcherAssert.assertThat;
4223
import static org.hamcrest.Matchers.*;
43-
import static org.mockito.Matchers.*;
4424

4525
public class ClientDishonorLeaseTest {
4626

@@ -56,52 +36,7 @@ public void testNoLeasesSentToClient() throws Exception {
5636
socket.requestResponse(PayloadImpl.EMPTY).subscribe(s);
5737
s.awaitTerminalEvent();
5838

59-
assertThat("Unexpected leases received by the client.", rule.leases, is(empty()));
39+
assertThat("Unexpected leases received by the client.", rule.getLeases(), is(empty()));
6040
}
6141

62-
public static class LocalRSRule extends LocalRule {
63-
64-
private ReactiveSocketServer socketServer;
65-
private ReactiveSocketClient socketClient;
66-
private LeaseDistributor leaseDistributorMock;
67-
private List<Lease> leases;
68-
69-
@Override
70-
public Statement apply(final Statement base, Description description) {
71-
return new Statement() {
72-
@Override
73-
public void evaluate() throws Throwable {
74-
leases = new CopyOnWriteArrayList<>();
75-
leaseDistributorMock = Mockito.mock(LeaseDistributor.class);
76-
Mockito.when(leaseDistributorMock.registerSocket(any())).thenReturn(new CancellableImpl());
77-
init();
78-
socketServer = ReactiveSocketServer.create(localServer);
79-
socketServer.start((setup, sendingSocket) -> {
80-
return new DefaultLeaseEnforcingSocket(new AbstractReactiveSocket() { }, leaseDistributorMock);
81-
});
82-
socketClient = ReactiveSocketClient.create(localClient, keepAlive(never())
83-
.disableLease(reactiveSocket -> new DisableLeaseSocket(reactiveSocket) {
84-
@Override
85-
public void accept(Lease lease) {
86-
leases.add(lease);
87-
}
88-
}));
89-
base.evaluate();
90-
}
91-
};
92-
}
93-
94-
public ReactiveSocket connectSocket() {
95-
return Single.fromPublisher(socketClient.connect()).blockingGet();
96-
}
97-
98-
@SuppressWarnings({"rawtypes", "unchecked"})
99-
public Consumer<Lease> sendLease() {
100-
ArgumentCaptor<Consumer> leaseConsumerCaptor = ArgumentCaptor.forClass(Consumer.class);
101-
Mockito.verify(leaseDistributorMock).registerSocket(leaseConsumerCaptor.capture());
102-
Consumer<Lease> leaseConsumer = leaseConsumerCaptor.getValue();
103-
leaseConsumer.accept(new LeaseImpl(1, 1, Frame.NULL_BYTEBUFFER));
104-
return leaseConsumer;
105-
}
106-
}
10742
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket;
15+
16+
import io.reactivesocket.test.util.LocalRSRule;
17+
import io.reactivex.subscribers.TestSubscriber;
18+
import org.junit.Rule;
19+
import org.junit.Test;
20+
21+
import static org.hamcrest.MatcherAssert.*;
22+
import static org.hamcrest.Matchers.*;
23+
24+
public class GracefulShutdownTest {
25+
@Rule
26+
public final LocalRSRule rule = new LocalRSRule();
27+
28+
@Test(timeout = 10000)
29+
public void testClientCloseWillCloseHandler() throws Exception {
30+
ReactiveSocket rs = rule.connectSocket();
31+
TestSubscriber<Void> sub = TestSubscriber.create();
32+
rs.close().subscribe(sub);
33+
sub.await().assertNoErrors();
34+
35+
assertThat("Accepting socket not closed.", rule.getAcceptingSocketCloses(), hasSize(1));
36+
}
37+
38+
@Test(timeout = 10000)
39+
public void testServerCloseClosesClient() throws Exception {
40+
ReactiveSocket rs = rule.connectSocket();
41+
TestSubscriber<Void> sub = TestSubscriber.create();
42+
rs.onClose().subscribe(sub);
43+
44+
rule.getStartedServer().shutdown();
45+
46+
sub.await().assertNoErrors();
47+
48+
assertThat("Accepting socket not closed.", rule.getAcceptingSocketCloses(), hasSize(1));
49+
}
50+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.test.util;
15+
16+
import io.reactivesocket.AbstractReactiveSocket;
17+
import io.reactivesocket.Frame;
18+
import io.reactivesocket.ReactiveSocket;
19+
import io.reactivesocket.client.ReactiveSocketClient;
20+
import io.reactivesocket.lease.DefaultLeaseEnforcingSocket;
21+
import io.reactivesocket.lease.DefaultLeaseEnforcingSocket.LeaseDistributor;
22+
import io.reactivesocket.lease.DisableLeaseSocket;
23+
import io.reactivesocket.lease.Lease;
24+
import io.reactivesocket.lease.LeaseImpl;
25+
import io.reactivesocket.local.LocalSendReceiveTest.LocalRule;
26+
import io.reactivesocket.reactivestreams.extensions.internal.CancellableImpl;
27+
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
28+
import io.reactivesocket.server.ReactiveSocketServer;
29+
import io.reactivesocket.transport.TransportServer.StartedServer;
30+
import io.reactivex.Single;
31+
import org.junit.runner.Description;
32+
import org.junit.runners.model.Statement;
33+
import org.mockito.ArgumentCaptor;
34+
import org.mockito.Mockito;
35+
36+
import java.util.List;
37+
import java.util.concurrent.CopyOnWriteArrayList;
38+
import java.util.function.Consumer;
39+
40+
import static io.reactivesocket.client.KeepAliveProvider.never;
41+
import static io.reactivesocket.client.SetupProvider.keepAlive;
42+
import static org.mockito.Matchers.any;
43+
44+
public class LocalRSRule extends LocalRule {
45+
46+
private ReactiveSocketServer socketServer;
47+
private ReactiveSocketClient socketClient;
48+
private LeaseDistributor leaseDistributorMock;
49+
private List<Lease> leases;
50+
private List<Boolean> acceptingSocketCloses;
51+
private StartedServer started;
52+
53+
@Override
54+
public Statement apply(final Statement base, Description description) {
55+
return new Statement() {
56+
@Override
57+
public void evaluate() throws Throwable {
58+
leases = new CopyOnWriteArrayList<>();
59+
acceptingSocketCloses = new CopyOnWriteArrayList<>();
60+
leaseDistributorMock = Mockito.mock(LeaseDistributor.class);
61+
Mockito.when(leaseDistributorMock.registerSocket(any())).thenReturn(new CancellableImpl());
62+
init();
63+
socketServer = ReactiveSocketServer.create(localServer);
64+
started = socketServer.start((setup, sendingSocket) -> {
65+
AbstractReactiveSocket accept = new AbstractReactiveSocket() {
66+
};
67+
accept.onClose().subscribe(Subscribers.doOnTerminate(() -> acceptingSocketCloses.add(true)));
68+
return new DefaultLeaseEnforcingSocket(accept, leaseDistributorMock);
69+
});
70+
socketClient = ReactiveSocketClient.create(localClient, keepAlive(never())
71+
.disableLease(reactiveSocket -> new DisableLeaseSocket(reactiveSocket) {
72+
@Override
73+
public void accept(Lease lease) {
74+
leases.add(lease);
75+
}
76+
}));
77+
base.evaluate();
78+
}
79+
};
80+
}
81+
82+
public ReactiveSocket connectSocket() {
83+
return Single.fromPublisher(socketClient.connect()).blockingGet();
84+
}
85+
86+
@SuppressWarnings({"rawtypes", "unchecked"})
87+
public Consumer<Lease> sendLease() {
88+
ArgumentCaptor<Consumer> leaseConsumerCaptor = ArgumentCaptor.forClass(Consumer.class);
89+
Mockito.verify(leaseDistributorMock).registerSocket(leaseConsumerCaptor.capture());
90+
Consumer<Lease> leaseConsumer = leaseConsumerCaptor.getValue();
91+
leaseConsumer.accept(new LeaseImpl(1, 1, Frame.NULL_BYTEBUFFER));
92+
return leaseConsumer;
93+
}
94+
95+
public List<Lease> getLeases() {
96+
return leases;
97+
}
98+
99+
public StartedServer getStartedServer() {
100+
return started;
101+
}
102+
103+
public ReactiveSocketServer getSocketServer() {
104+
return socketServer;
105+
}
106+
107+
public ReactiveSocketClient getSocketClient() {
108+
return socketClient;
109+
}
110+
111+
public List<Boolean> getAcceptingSocketCloses() {
112+
return acceptingSocketCloses;
113+
}
114+
}

0 commit comments

Comments
 (0)