Skip to content

Commit 05ee843

Browse files
robertroeserstevegury
authored andcommitted
adding a callback when a reactive socket is closed (#82)
1 parent 119ef1e commit 05ee843

File tree

4 files changed

+100
-8
lines changed

4 files changed

+100
-8
lines changed

src/main/java/io/reactivesocket/DefaultReactiveSocket.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.reactivestreams.Subscription;
3030

3131
import java.io.IOException;
32+
import java.util.concurrent.CopyOnWriteArrayList;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.function.Consumer;
3435

@@ -55,6 +56,7 @@ public class DefaultReactiveSocket implements ReactiveSocket {
5556
private final RequestHandler clientRequestHandler;
5657
private final ConnectionSetupHandler responderConnectionHandler;
5758
private final LeaseGovernor leaseGovernor;
59+
private final CopyOnWriteArrayList<Completable> shutdownListeners;
5860

5961
private DefaultReactiveSocket(
6062
DuplexConnection connection,
@@ -72,6 +74,7 @@ private DefaultReactiveSocket(
7274
this.responderConnectionHandler = responderConnectionHandler;
7375
this.leaseGovernor = leaseGovernor;
7476
this.errorStream = errorStream;
77+
this.shutdownListeners = new CopyOnWriteArrayList<>();
7578
}
7679

7780
/**
@@ -439,15 +442,28 @@ public void addOutput(Frame f, Completable callback) {
439442

440443
};
441444

445+
@Override
446+
public void onShutdown(Completable c) {
447+
shutdownListeners.add(c);
448+
}
449+
442450
@Override
443451
public void close() throws Exception {
444-
connection.close();
445-
leaseGovernor.unregister(responder);
446-
if (requester != null) {
447-
requester.shutdown();
448-
}
449-
if (responder != null) {
450-
responder.shutdown();
452+
try {
453+
connection.close();
454+
leaseGovernor.unregister(responder);
455+
if (requester != null) {
456+
requester.shutdown();
457+
}
458+
if (responder != null) {
459+
responder.shutdown();
460+
}
461+
462+
shutdownListeners.forEach(Completable::success);
463+
464+
} catch (Throwable t) {
465+
shutdownListeners.forEach(c -> c.error(t));
466+
throw t;
451467
}
452468
}
453469

src/main/java/io/reactivesocket/DuplexConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.reactivesocket;
1717

18+
import io.reactivesocket.internal.rx.EmptySubscription;
1819
import io.reactivesocket.rx.Completable;
1920
import io.reactivesocket.rx.Observable;
2021
import org.reactivestreams.Publisher;
@@ -32,6 +33,7 @@ public interface DuplexConnection extends Closeable {
3233

3334
default void addOutput(Frame frame, Completable callback) {
3435
addOutput(s -> {
36+
s.onSubscribe(EmptySubscription.INSTANCE);
3537
s.onNext(frame);
3638
s.onComplete();
3739
}, callback);

src/main/java/io/reactivesocket/ReactiveSocket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public void error(Throwable e) {
9494
*/
9595
void onRequestReady(Completable c);
9696

97+
/**
98+
* Registers a completable to be run when an ReactiveSocket is closed
99+
*/
100+
void onShutdown(Completable c);
101+
97102
/**
98103
* Server granting new lease information to client
99104
*

src/test/java/io/reactivesocket/ReactiveSocketTest.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.reactivesocket;
1717

1818
import io.reactivesocket.lease.FairLeaseGovernor;
19+
import io.reactivesocket.rx.Completable;
1920
import io.reactivex.disposables.Disposable;
2021
import io.reactivex.observables.ConnectableObservable;
2122
import io.reactivex.subscribers.TestSubscriber;
@@ -256,6 +257,74 @@ private void awaitSocketAvailability(ReactiveSocket socket, long timeout, TimeUn
256257
assertTrue("client socket has positive avaibility", socket.availability() > 0.0);
257258
}
258259

260+
@Test(timeout = 2000)
261+
public void testShutdownListener() throws Exception {
262+
socketClient = DefaultReactiveSocket.fromClientConnection(
263+
clientConnection,
264+
ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS),
265+
err -> err.printStackTrace()
266+
);
267+
268+
CountDownLatch latch = new CountDownLatch(1);
269+
270+
socketClient.onShutdown(new Completable() {
271+
@Override
272+
public void success() {
273+
latch.countDown();
274+
}
275+
276+
@Override
277+
public void error(Throwable e) {
278+
279+
}
280+
});
281+
282+
socketClient.close();
283+
284+
latch.await();
285+
}
286+
287+
@Test(timeout = 2000)
288+
public void testMultipleShutdownListeners() throws Exception {
289+
socketClient = DefaultReactiveSocket.fromClientConnection(
290+
clientConnection,
291+
ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS),
292+
err -> err.printStackTrace()
293+
);
294+
295+
CountDownLatch latch = new CountDownLatch(2);
296+
297+
socketClient
298+
.onShutdown(new Completable() {
299+
@Override
300+
public void success() {
301+
latch.countDown();
302+
}
303+
304+
@Override
305+
public void error(Throwable e) {
306+
307+
}
308+
});
309+
310+
socketClient
311+
.onShutdown(new Completable() {
312+
@Override
313+
public void success() {
314+
latch.countDown();
315+
}
316+
317+
@Override
318+
public void error(Throwable e) {
319+
320+
}
321+
});
322+
323+
socketClient.close();
324+
325+
latch.await();
326+
}
327+
259328
@Test(timeout=2000)
260329
@Theory
261330
public void testRequestResponse(int setupFlag) throws InterruptedException {
@@ -269,7 +338,7 @@ public void testRequestResponse(int setupFlag) throws InterruptedException {
269338
ts.assertNoErrors();
270339
ts.assertValue(TestUtil.utf8EncodedPayload("hello world", null));
271340
}
272-
341+
273342
@Test(timeout=2000, expected=IllegalStateException.class)
274343
public void testRequestResponsePremature() throws InterruptedException {
275344
socketClient = DefaultReactiveSocket.fromClientConnection(

0 commit comments

Comments
 (0)