Skip to content

Commit 9cf23f0

Browse files
committed
improves KeepAliveFramesAcceptor to be Disposable and fixes related
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 1237fc5 commit 9cf23f0

File tree

4 files changed

+113
-33
lines changed

4 files changed

+113
-33
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,9 @@ private void tryShutdown() {
792792
}
793793

794794
private void terminate(Throwable e) {
795+
if (keepAliveFramesAcceptor != null) {
796+
keepAliveFramesAcceptor.dispose();
797+
}
795798
connection.dispose();
796799
leaseHandler.dispose();
797800

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package io.rsocket.keepalive;
22

33
import io.netty.buffer.ByteBuf;
4+
import reactor.core.Disposable;
45

5-
public interface KeepAliveFramesAcceptor {
6+
public interface KeepAliveFramesAcceptor extends Disposable {
67

78
void receive(ByteBuf keepAliveFrame);
89
}

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@ long remoteLastReceivedPosition(ByteBuf keepAliveFrame) {
121121
return KeepAliveFrameCodec.lastPosition(keepAliveFrame);
122122
}
123123

124+
@Override
125+
public void dispose() {
126+
stop();
127+
}
128+
129+
@Override
130+
public boolean isDisposed() {
131+
return ticksDisposable.isDisposed();
132+
}
133+
134+
/**
135+
* @deprecated since it should not be used anymore and will be completely removed in 1.1.
136+
* Keepalive is symmetric on both side and implemented as a part of RSocketRequester
137+
*/
138+
@Deprecated
124139
public static final class ServerKeepAliveSupport extends KeepAliveSupport {
125140

126141
public ServerKeepAliveSupport(

rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java

Lines changed: 93 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@
3737
import io.rsocket.util.DefaultPayload;
3838
import java.time.Duration;
3939
import org.assertj.core.api.Assertions;
40-
import org.junit.jupiter.api.BeforeEach;
4140
import org.junit.jupiter.api.Test;
41+
import reactor.core.Disposable;
4242
import reactor.core.publisher.Flux;
4343
import reactor.core.publisher.Mono;
4444
import reactor.test.StepVerifier;
@@ -48,9 +48,6 @@ public class KeepAliveTest {
4848
private static final int KEEP_ALIVE_TIMEOUT = 1000;
4949
private static final int RESUMABLE_KEEP_ALIVE_TIMEOUT = 200;
5050

51-
private RSocketState requesterState;
52-
private ResumableRSocketState resumableRequesterState;
53-
5451
static RSocketState requester(int tickPeriod, int timeout) {
5552
LeaksTrackingByteBufAllocator allocator =
5653
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
@@ -97,57 +94,75 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) {
9794
return new ResumableRSocketState(rSocket, connection, resumableConnection, allocator);
9895
}
9996

100-
@BeforeEach
101-
void setUp() {
102-
requesterState = requester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
103-
resumableRequesterState = resumableRequester(KEEP_ALIVE_INTERVAL, RESUMABLE_KEEP_ALIVE_TIMEOUT);
104-
}
105-
10697
@Test
107-
void rSocketNotDisposedOnPresentKeepAlives() {
98+
void rSocketNotDisposedOnPresentKeepAlives() throws InterruptedException {
99+
RSocketState requesterState = requester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
100+
108101
TestDuplexConnection connection = requesterState.connection();
109102

110-
Flux.interval(Duration.ofMillis(100))
111-
.subscribe(
112-
n ->
113-
connection.addToReceivedBuffer(
114-
KeepAliveFrameCodec.encode(
115-
ByteBufAllocator.DEFAULT, true, 0, Unpooled.EMPTY_BUFFER)));
103+
Disposable disposable =
104+
Flux.interval(Duration.ofMillis(KEEP_ALIVE_INTERVAL))
105+
.subscribe(
106+
n ->
107+
connection.addToReceivedBuffer(
108+
KeepAliveFrameCodec.encode(
109+
requesterState.allocator, true, 0, Unpooled.EMPTY_BUFFER)));
116110

117-
Mono.delay(Duration.ofMillis(2000)).block();
111+
Thread.sleep(KEEP_ALIVE_TIMEOUT * 2);
118112

119113
RSocket rSocket = requesterState.rSocket();
120114

121115
Assertions.assertThat(rSocket.isDisposed()).isFalse();
116+
117+
disposable.dispose();
118+
Thread.sleep(KEEP_ALIVE_INTERVAL);
119+
120+
requesterState.connection.dispose();
121+
requesterState.rSocket.dispose();
122+
123+
Thread.sleep(KEEP_ALIVE_TIMEOUT);
124+
125+
Assertions.assertThat(requesterState.connection.getSent()).allMatch(ByteBuf::release);
126+
requesterState.allocator.assertHasNoLeaks();
122127
}
123128

124129
@Test
125130
void noKeepAlivesSentAfterRSocketDispose() {
131+
RSocketState requesterState = requester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
132+
126133
requesterState.rSocket().dispose();
134+
127135
StepVerifier.create(
128136
Flux.from(requesterState.connection().getSentAsPublisher())
129137
.take(Duration.ofMillis(500)))
130138
.expectComplete()
131139
.verify(Duration.ofSeconds(1));
140+
requesterState.allocator.assertHasNoLeaks();
132141
}
133142

134143
@Test
135144
void rSocketDisposedOnMissingKeepAlives() {
145+
RSocketState requesterState = requester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
146+
136147
RSocket rSocket = requesterState.rSocket();
137148

138-
Mono.delay(Duration.ofMillis(2000)).block();
149+
Mono.delay(Duration.ofMillis(KEEP_ALIVE_TIMEOUT * 2)).block();
139150

140151
Assertions.assertThat(rSocket.isDisposed()).isTrue();
141152
rSocket
142153
.onClose()
143154
.as(StepVerifier::create)
144155
.expectError(ConnectionErrorException.class)
145156
.verify(Duration.ofMillis(100));
157+
158+
Assertions.assertThat(requesterState.connection.getSent()).allMatch(ByteBuf::release);
159+
160+
requesterState.allocator.assertHasNoLeaks();
146161
}
147162

148163
@Test
149-
void clientRequesterSendsKeepAlives() {
150-
RSocketState RSocketState = requester(100, 1000);
164+
void clientRequesterSendsKeepAlives() throws InterruptedException {
165+
RSocketState RSocketState = requester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
151166
TestDuplexConnection connection = RSocketState.connection();
152167

153168
StepVerifier.create(Flux.from(connection.getSentAsPublisher()).take(3))
@@ -156,27 +171,41 @@ void clientRequesterSendsKeepAlives() {
156171
.expectNextMatches(this::keepAliveFrameWithRespondFlag)
157172
.expectComplete()
158173
.verify(Duration.ofSeconds(5));
174+
175+
RSocketState.rSocket.dispose();
176+
RSocketState.connection.dispose();
177+
178+
Thread.sleep(KEEP_ALIVE_INTERVAL);
179+
180+
RSocketState.allocator.assertHasNoLeaks();
159181
}
160182

161183
@Test
162-
void requesterRespondsToKeepAlives() {
163-
RSocketState RSocketState = requester(100_000, 100_000);
164-
TestDuplexConnection connection = RSocketState.connection();
184+
void requesterRespondsToKeepAlives() throws InterruptedException {
185+
RSocketState rSocketState = requester(100_000, 100_000);
186+
TestDuplexConnection connection = rSocketState.connection();
165187
Mono.delay(Duration.ofMillis(100))
166188
.subscribe(
167189
l ->
168190
connection.addToReceivedBuffer(
169191
KeepAliveFrameCodec.encode(
170-
ByteBufAllocator.DEFAULT, true, 0, Unpooled.EMPTY_BUFFER)));
192+
rSocketState.allocator, true, 0, Unpooled.EMPTY_BUFFER)));
171193

172194
StepVerifier.create(Flux.from(connection.getSentAsPublisher()).take(1))
173195
.expectNextMatches(this::keepAliveFrameWithoutRespondFlag)
174196
.expectComplete()
175197
.verify(Duration.ofSeconds(5));
198+
199+
rSocketState.rSocket.dispose();
200+
rSocketState.connection.dispose();
201+
202+
Thread.sleep(KEEP_ALIVE_TIMEOUT);
203+
204+
rSocketState.allocator.assertHasNoLeaks();
176205
}
177206

178207
@Test
179-
void resumableRequesterNoKeepAlivesAfterDisconnect() {
208+
void resumableRequesterNoKeepAlivesAfterDisconnect() throws InterruptedException {
180209
ResumableRSocketState rSocketState =
181210
resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
182211
TestDuplexConnection testConnection = rSocketState.connection();
@@ -187,10 +216,17 @@ void resumableRequesterNoKeepAlivesAfterDisconnect() {
187216
StepVerifier.create(Flux.from(testConnection.getSentAsPublisher()).take(Duration.ofMillis(500)))
188217
.expectComplete()
189218
.verify(Duration.ofSeconds(5));
219+
220+
rSocketState.rSocket.dispose();
221+
rSocketState.connection.dispose();
222+
223+
Thread.sleep(KEEP_ALIVE_INTERVAL);
224+
225+
rSocketState.allocator.assertHasNoLeaks();
190226
}
191227

192228
@Test
193-
void resumableRequesterKeepAlivesAfterReconnect() {
229+
void resumableRequesterKeepAlivesAfterReconnect() throws InterruptedException {
194230
ResumableRSocketState rSocketState =
195231
resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
196232
ResumableDuplexConnection resumableDuplexConnection = rSocketState.resumableDuplexConnection();
@@ -200,43 +236,68 @@ void resumableRequesterKeepAlivesAfterReconnect() {
200236
resumableDuplexConnection.resume(0, 0, ignored -> Mono.empty());
201237

202238
StepVerifier.create(Flux.from(newTestConnection.getSentAsPublisher()).take(1))
203-
.expectNextMatches(this::keepAliveFrame)
239+
.expectNextMatches(frame -> keepAliveFrame(frame) && frame.release())
204240
.expectComplete()
205241
.verify(Duration.ofSeconds(5));
242+
243+
rSocketState.rSocket.dispose();
244+
rSocketState.connection.dispose();
245+
246+
Thread.sleep(KEEP_ALIVE_INTERVAL);
247+
248+
rSocketState.allocator.assertHasNoLeaks();
206249
}
207250

208251
@Test
209-
void resumableRequesterNoKeepAlivesAfterDispose() {
252+
void resumableRequesterNoKeepAlivesAfterDispose() throws InterruptedException {
210253
ResumableRSocketState rSocketState =
211254
resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
212255
rSocketState.rSocket().dispose();
213256
StepVerifier.create(
214257
Flux.from(rSocketState.connection().getSentAsPublisher()).take(Duration.ofMillis(500)))
215258
.expectComplete()
216259
.verify(Duration.ofSeconds(5));
260+
261+
rSocketState.rSocket.dispose();
262+
rSocketState.connection.dispose();
263+
264+
Thread.sleep(KEEP_ALIVE_INTERVAL);
265+
266+
rSocketState.allocator.assertHasNoLeaks();
217267
}
218268

219269
@Test
220-
void resumableRSocketsNotDisposedOnMissingKeepAlives() {
270+
void resumableRSocketsNotDisposedOnMissingKeepAlives() throws InterruptedException {
271+
ResumableRSocketState resumableRequesterState =
272+
resumableRequester(KEEP_ALIVE_INTERVAL, RESUMABLE_KEEP_ALIVE_TIMEOUT);
221273
RSocket rSocket = resumableRequesterState.rSocket();
222274
TestDuplexConnection connection = resumableRequesterState.connection();
223275

224276
Mono.delay(Duration.ofMillis(500)).block();
225277

226278
Assertions.assertThat(rSocket.isDisposed()).isFalse();
227279
Assertions.assertThat(connection.isDisposed()).isTrue();
280+
281+
Assertions.assertThat(resumableRequesterState.connection.getSent()).allMatch(ByteBuf::release);
282+
283+
resumableRequesterState.connection.dispose();
284+
resumableRequesterState.rSocket.dispose();
285+
286+
Thread.sleep(KEEP_ALIVE_INTERVAL);
287+
288+
resumableRequesterState.allocator.assertHasNoLeaks();
228289
}
229290

230291
private boolean keepAliveFrame(ByteBuf frame) {
231292
return FrameHeaderCodec.frameType(frame) == FrameType.KEEPALIVE;
232293
}
233294

234295
private boolean keepAliveFrameWithRespondFlag(ByteBuf frame) {
235-
return keepAliveFrame(frame) && KeepAliveFrameCodec.respondFlag(frame);
296+
return keepAliveFrame(frame) && KeepAliveFrameCodec.respondFlag(frame) && frame.release();
236297
}
237298

238299
private boolean keepAliveFrameWithoutRespondFlag(ByteBuf frame) {
239-
return keepAliveFrame(frame) && !KeepAliveFrameCodec.respondFlag(frame);
300+
return keepAliveFrame(frame) && !KeepAliveFrameCodec.respondFlag(frame) && frame.release();
240301
}
241302

242303
static class RSocketState {

0 commit comments

Comments
 (0)