Skip to content

Commit e0f4bc3

Browse files
author
OlegDokuka
committed
ensures local server awaits all connections are close before termination notification
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
1 parent c633030 commit e0f4bc3

File tree

3 files changed

+64
-23
lines changed

3 files changed

+64
-23
lines changed

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
package io.rsocket.transport.local;
1818

1919
import io.rsocket.Closeable;
20+
import io.rsocket.DuplexConnection;
2021
import io.rsocket.transport.ClientTransport;
2122
import io.rsocket.transport.ServerTransport;
2223
import java.util.Objects;
24+
import java.util.Set;
2325
import java.util.UUID;
2426
import java.util.concurrent.ConcurrentHashMap;
2527
import java.util.concurrent.ConcurrentMap;
28+
import java.util.stream.Collectors;
2629
import reactor.core.Scannable;
2730
import reactor.core.publisher.Mono;
2831
import reactor.core.publisher.Sinks;
@@ -34,7 +37,7 @@
3437
*/
3538
public final class LocalServerTransport implements ServerTransport<Closeable> {
3639

37-
private static final ConcurrentMap<String, ConnectionAcceptor> registry =
40+
private static final ConcurrentMap<String, ServerCloseableAcceptor> registry =
3841
new ConcurrentHashMap<>();
3942

4043
private final String name;
@@ -72,7 +75,10 @@ public static LocalServerTransport createEphemeral() {
7275
*/
7376
public static void dispose(String name) {
7477
Objects.requireNonNull(name, "name must not be null");
75-
registry.remove(name);
78+
ServerCloseableAcceptor sca = registry.remove(name);
79+
if (sca != null) {
80+
sca.dispose();
81+
}
7682
}
7783

7884
/**
@@ -107,34 +113,55 @@ public Mono<Closeable> start(ConnectionAcceptor acceptor) {
107113
Objects.requireNonNull(acceptor, "acceptor must not be null");
108114
return Mono.create(
109115
sink -> {
110-
ServerCloseable closeable = new ServerCloseable(name, acceptor);
111-
if (registry.putIfAbsent(name, acceptor) != null) {
112-
throw new IllegalStateException("name already registered: " + name);
116+
ServerCloseableAcceptor closeable = new ServerCloseableAcceptor(name, acceptor);
117+
if (registry.putIfAbsent(name, closeable) != null) {
118+
sink.error(new IllegalStateException("name already registered: " + name));
113119
}
114120
sink.success(closeable);
115121
});
116122
}
117123

118-
static class ServerCloseable implements Closeable {
124+
@SuppressWarnings({"ReactorTransformationOnMonoVoid", "CallingSubscribeInNonBlockingScope"})
125+
static class ServerCloseableAcceptor implements ConnectionAcceptor, Closeable {
119126

120127
private final LocalSocketAddress address;
121128

122129
private final ConnectionAcceptor acceptor;
123130

124-
private final Sinks.Empty<Void> onClose = Sinks.empty();
131+
private final Set<DuplexConnection> activeConnections = ConcurrentHashMap.newKeySet();
132+
133+
private final Sinks.Empty<Void> onClose = Sinks.unsafe().empty();
125134

126-
ServerCloseable(String name, ConnectionAcceptor acceptor) {
135+
ServerCloseableAcceptor(String name, ConnectionAcceptor acceptor) {
127136
Objects.requireNonNull(name, "name must not be null");
128137
this.address = new LocalSocketAddress(name);
129138
this.acceptor = acceptor;
130139
}
131140

141+
@Override
142+
public Mono<Void> apply(DuplexConnection duplexConnection) {
143+
activeConnections.add(duplexConnection);
144+
duplexConnection
145+
.onClose()
146+
.doFinally(__ -> activeConnections.remove(duplexConnection))
147+
.subscribe();
148+
return acceptor.apply(duplexConnection);
149+
}
150+
132151
@Override
133152
public void dispose() {
134-
if (!registry.remove(address.getName(), acceptor)) {
135-
throw new AssertionError();
153+
if (!registry.remove(address.getName(), this)) {
154+
// already disposed
155+
return;
136156
}
137-
onClose.tryEmitEmpty();
157+
158+
Mono.whenDelayError(
159+
activeConnections
160+
.stream()
161+
.peek(DuplexConnection::dispose)
162+
.map(DuplexConnection::onClose)
163+
.collect(Collectors.toList()))
164+
.subscribe(null, onClose::tryEmitError, onClose::tryEmitEmpty);
138165
}
139166

140167
@Override

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
2121

22+
import io.rsocket.Closeable;
23+
import java.time.Duration;
2224
import org.junit.jupiter.api.DisplayName;
2325
import org.junit.jupiter.api.Test;
24-
import reactor.core.publisher.Mono;
2526
import reactor.test.StepVerifier;
2627

2728
final class LocalClientTransportTest {
@@ -31,12 +32,20 @@ final class LocalClientTransportTest {
3132
void connect() {
3233
LocalServerTransport serverTransport = LocalServerTransport.createEphemeral();
3334

34-
serverTransport
35-
.start(duplexConnection -> Mono.empty())
36-
.flatMap(closeable -> LocalClientTransport.create(serverTransport.getName()).connect())
37-
.as(StepVerifier::create)
38-
.expectNextCount(1)
39-
.verifyComplete();
35+
Closeable closeable =
36+
serverTransport.start(duplexConnection -> duplexConnection.receive().then()).block();
37+
38+
try {
39+
LocalClientTransport.create(serverTransport.getName())
40+
.connect()
41+
.doOnNext(d -> d.receive().subscribe())
42+
.as(StepVerifier::create)
43+
.expectNextCount(1)
44+
.verifyComplete();
45+
} finally {
46+
closeable.dispose();
47+
closeable.onClose().block(Duration.ofSeconds(5));
48+
}
4049
}
4150

4251
@DisplayName("generates error if server not started")

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalServerTransportTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,16 @@ void named() {
9696
@DisplayName("starts local server transport")
9797
@Test
9898
void start() {
99-
LocalServerTransport.createEphemeral()
100-
.start(duplexConnection -> Mono.empty())
101-
.as(StepVerifier::create)
102-
.expectNextCount(1)
103-
.verifyComplete();
99+
LocalServerTransport ephemeral = LocalServerTransport.createEphemeral();
100+
try {
101+
ephemeral
102+
.start(duplexConnection -> Mono.empty())
103+
.as(StepVerifier::create)
104+
.expectNextCount(1)
105+
.verifyComplete();
106+
} finally {
107+
LocalServerTransport.dispose(ephemeral.getName());
108+
}
104109
}
105110

106111
@DisplayName("start throws NullPointerException with null acceptor")

0 commit comments

Comments
 (0)