Skip to content

Commit 2dbf61d

Browse files
committed
Merge branch 1.0.x into master
Signed-off-by: Oleh Dokuka <[email protected]>
2 parents 20d4fca + e5f7c29 commit 2dbf61d

File tree

2 files changed

+64
-44
lines changed

2 files changed

+64
-44
lines changed

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,36 +22,45 @@
2222
import io.rsocket.frame.KeepAliveFrameCodec;
2323
import io.rsocket.resume.ResumeStateHolder;
2424
import java.time.Duration;
25+
import java.util.concurrent.TimeUnit;
2526
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.function.Consumer;
2728
import reactor.core.Disposable;
2829
import reactor.core.publisher.Flux;
30+
import reactor.core.scheduler.Scheduler;
31+
import reactor.core.scheduler.Schedulers;
2932

3033
public abstract class KeepAliveSupport implements KeepAliveFramesAcceptor {
34+
3135
final ByteBufAllocator allocator;
32-
private final Duration keepAliveInterval;
33-
private final Duration keepAliveTimeout;
34-
private final long keepAliveTimeoutMillis;
35-
private volatile Consumer<KeepAlive> onTimeout;
36-
private volatile Consumer<ByteBuf> onFrameSent;
37-
private volatile Disposable ticksDisposable;
38-
private final AtomicBoolean started = new AtomicBoolean();
36+
final Scheduler scheduler;
37+
final Duration keepAliveInterval;
38+
final Duration keepAliveTimeout;
39+
final long keepAliveTimeoutMillis;
40+
41+
final AtomicBoolean started = new AtomicBoolean();
42+
43+
volatile Consumer<KeepAlive> onTimeout;
44+
volatile Consumer<ByteBuf> onFrameSent;
45+
volatile Disposable ticksDisposable;
3946

40-
private volatile ResumeStateHolder resumeStateHolder;
41-
private volatile long lastReceivedMillis;
47+
volatile ResumeStateHolder resumeStateHolder;
48+
volatile long lastReceivedMillis;
4249

4350
private KeepAliveSupport(
4451
ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
4552
this.allocator = allocator;
53+
this.scheduler = Schedulers.parallel();
4654
this.keepAliveInterval = Duration.ofMillis(keepAliveInterval);
4755
this.keepAliveTimeout = Duration.ofMillis(keepAliveTimeout);
4856
this.keepAliveTimeoutMillis = keepAliveTimeout;
4957
}
5058

5159
public KeepAliveSupport start() {
52-
this.lastReceivedMillis = System.currentTimeMillis();
60+
this.lastReceivedMillis = scheduler.now(TimeUnit.MILLISECONDS);
5361
if (started.compareAndSet(false, true)) {
54-
ticksDisposable = Flux.interval(keepAliveInterval).subscribe(v -> onIntervalTick());
62+
ticksDisposable =
63+
Flux.interval(keepAliveInterval, scheduler).subscribe(v -> onIntervalTick());
5564
}
5665
return this;
5766
}
@@ -64,7 +73,7 @@ public void stop() {
6473

6574
@Override
6675
public void receive(ByteBuf keepAliveFrame) {
67-
this.lastReceivedMillis = System.currentTimeMillis();
76+
this.lastReceivedMillis = scheduler.now(TimeUnit.MILLISECONDS);
6877
if (resumeStateHolder != null) {
6978
long remoteLastReceivedPos = remoteLastReceivedPosition(keepAliveFrame);
7079
resumeStateHolder.onImpliedPosition(remoteLastReceivedPos);
@@ -104,7 +113,7 @@ void send(ByteBuf frame) {
104113
}
105114

106115
void tryTimeout() {
107-
long now = System.currentTimeMillis();
116+
long now = scheduler.now(TimeUnit.MILLISECONDS);
108117
if (now - lastReceivedMillis >= keepAliveTimeoutMillis) {
109118
if (onTimeout != null) {
110119
onTimeout.accept(new KeepAlive(keepAliveInterval, keepAliveTimeout));

rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,32 @@
3737
import io.rsocket.util.DefaultPayload;
3838
import java.time.Duration;
3939
import org.assertj.core.api.Assertions;
40+
import org.junit.jupiter.api.AfterEach;
41+
import org.junit.jupiter.api.BeforeEach;
4042
import org.junit.jupiter.api.Test;
4143
import reactor.core.Disposable;
4244
import reactor.core.publisher.Flux;
4345
import reactor.core.publisher.Mono;
4446
import reactor.test.StepVerifier;
47+
import reactor.test.scheduler.VirtualTimeScheduler;
4548

4649
public class KeepAliveTest {
4750
private static final int KEEP_ALIVE_INTERVAL = 100;
4851
private static final int KEEP_ALIVE_TIMEOUT = 1000;
4952
private static final int RESUMABLE_KEEP_ALIVE_TIMEOUT = 200;
5053

54+
VirtualTimeScheduler virtualTimeScheduler;
55+
56+
@BeforeEach
57+
public void setUp() {
58+
virtualTimeScheduler = VirtualTimeScheduler.getOrSet();
59+
}
60+
61+
@AfterEach
62+
public void tearDown() {
63+
VirtualTimeScheduler.reset();
64+
}
65+
5166
static RSocketState requester(int tickPeriod, int timeout) {
5267
LeaksTrackingByteBufAllocator allocator =
5368
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
@@ -95,7 +110,7 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) {
95110
}
96111

97112
@Test
98-
void rSocketNotDisposedOnPresentKeepAlives() throws InterruptedException {
113+
void rSocketNotDisposedOnPresentKeepAlives() {
99114
RSocketState requesterState = requester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
100115

101116
TestDuplexConnection connection = requesterState.connection();
@@ -108,21 +123,19 @@ void rSocketNotDisposedOnPresentKeepAlives() throws InterruptedException {
108123
KeepAliveFrameCodec.encode(
109124
requesterState.allocator, true, 0, Unpooled.EMPTY_BUFFER)));
110125

111-
Thread.sleep(KEEP_ALIVE_TIMEOUT * 2);
126+
virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(KEEP_ALIVE_TIMEOUT * 2));
112127

113128
RSocket rSocket = requesterState.rSocket();
114129

115130
Assertions.assertThat(rSocket.isDisposed()).isFalse();
116131

117132
disposable.dispose();
118-
Thread.sleep(KEEP_ALIVE_INTERVAL);
119133

120134
requesterState.connection.dispose();
121135
requesterState.rSocket.dispose();
122136

123-
Thread.sleep(KEEP_ALIVE_TIMEOUT);
124-
125137
Assertions.assertThat(requesterState.connection.getSent()).allMatch(ByteBuf::release);
138+
126139
requesterState.allocator.assertHasNoLeaks();
127140
}
128141

@@ -132,11 +145,12 @@ void noKeepAlivesSentAfterRSocketDispose() {
132145

133146
requesterState.rSocket().dispose();
134147

135-
StepVerifier.create(
136-
Flux.from(requesterState.connection().getSentAsPublisher())
137-
.take(Duration.ofMillis(500)))
148+
Duration duration = Duration.ofMillis(500);
149+
StepVerifier.create(Flux.from(requesterState.connection().getSentAsPublisher()).take(duration))
150+
.then(() -> virtualTimeScheduler.advanceTimeBy(duration))
138151
.expectComplete()
139152
.verify(Duration.ofSeconds(1));
153+
140154
requesterState.allocator.assertHasNoLeaks();
141155
}
142156

@@ -146,7 +160,7 @@ void rSocketDisposedOnMissingKeepAlives() {
146160

147161
RSocket rSocket = requesterState.rSocket();
148162

149-
Mono.delay(Duration.ofMillis(KEEP_ALIVE_TIMEOUT * 2)).block();
163+
virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(KEEP_ALIVE_TIMEOUT * 2));
150164

151165
Assertions.assertThat(rSocket.isDisposed()).isTrue();
152166
rSocket
@@ -161,72 +175,73 @@ void rSocketDisposedOnMissingKeepAlives() {
161175
}
162176

163177
@Test
164-
void clientRequesterSendsKeepAlives() throws InterruptedException {
178+
void clientRequesterSendsKeepAlives() {
165179
RSocketState RSocketState = requester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
166180
TestDuplexConnection connection = RSocketState.connection();
167181

168182
StepVerifier.create(Flux.from(connection.getSentAsPublisher()).take(3))
183+
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(KEEP_ALIVE_INTERVAL)))
169184
.expectNextMatches(this::keepAliveFrameWithRespondFlag)
185+
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(KEEP_ALIVE_INTERVAL)))
170186
.expectNextMatches(this::keepAliveFrameWithRespondFlag)
187+
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(KEEP_ALIVE_INTERVAL)))
171188
.expectNextMatches(this::keepAliveFrameWithRespondFlag)
172189
.expectComplete()
173190
.verify(Duration.ofSeconds(5));
174191

175192
RSocketState.rSocket.dispose();
176193
RSocketState.connection.dispose();
177194

178-
Thread.sleep(KEEP_ALIVE_INTERVAL);
179-
180195
RSocketState.allocator.assertHasNoLeaks();
181196
}
182197

183198
@Test
184-
void requesterRespondsToKeepAlives() throws InterruptedException {
199+
void requesterRespondsToKeepAlives() {
185200
RSocketState rSocketState = requester(100_000, 100_000);
186201
TestDuplexConnection connection = rSocketState.connection();
187-
Mono.delay(Duration.ofMillis(100))
202+
Duration duration = Duration.ofMillis(100);
203+
Mono.delay(duration)
188204
.subscribe(
189205
l ->
190206
connection.addToReceivedBuffer(
191207
KeepAliveFrameCodec.encode(
192208
rSocketState.allocator, true, 0, Unpooled.EMPTY_BUFFER)));
193209

194210
StepVerifier.create(Flux.from(connection.getSentAsPublisher()).take(1))
211+
.then(() -> virtualTimeScheduler.advanceTimeBy(duration))
195212
.expectNextMatches(this::keepAliveFrameWithoutRespondFlag)
196213
.expectComplete()
197214
.verify(Duration.ofSeconds(5));
198215

199216
rSocketState.rSocket.dispose();
200217
rSocketState.connection.dispose();
201218

202-
Thread.sleep(KEEP_ALIVE_TIMEOUT);
203-
204219
rSocketState.allocator.assertHasNoLeaks();
205220
}
206221

207222
@Test
208-
void resumableRequesterNoKeepAlivesAfterDisconnect() throws InterruptedException {
223+
void resumableRequesterNoKeepAlivesAfterDisconnect() {
209224
ResumableRSocketState rSocketState =
210225
resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
211226
TestDuplexConnection testConnection = rSocketState.connection();
212227
ResumableDuplexConnection resumableDuplexConnection = rSocketState.resumableDuplexConnection();
213228

214229
resumableDuplexConnection.disconnect();
215230

216-
StepVerifier.create(Flux.from(testConnection.getSentAsPublisher()).take(Duration.ofMillis(500)))
231+
Duration duration = Duration.ofMillis(500);
232+
StepVerifier.create(Flux.from(testConnection.getSentAsPublisher()).take(duration))
233+
.then(() -> virtualTimeScheduler.advanceTimeBy(duration))
217234
.expectComplete()
218235
.verify(Duration.ofSeconds(5));
219236

220237
rSocketState.rSocket.dispose();
221238
rSocketState.connection.dispose();
222239

223-
Thread.sleep(KEEP_ALIVE_INTERVAL);
224-
225240
rSocketState.allocator.assertHasNoLeaks();
226241
}
227242

228243
@Test
229-
void resumableRequesterKeepAlivesAfterReconnect() throws InterruptedException {
244+
void resumableRequesterKeepAlivesAfterReconnect() {
230245
ResumableRSocketState rSocketState =
231246
resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
232247
ResumableDuplexConnection resumableDuplexConnection = rSocketState.resumableDuplexConnection();
@@ -236,33 +251,31 @@ void resumableRequesterKeepAlivesAfterReconnect() throws InterruptedException {
236251
resumableDuplexConnection.resume(0, 0, ignored -> Mono.empty());
237252

238253
StepVerifier.create(Flux.from(newTestConnection.getSentAsPublisher()).take(1))
254+
.then(() -> virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(KEEP_ALIVE_INTERVAL)))
239255
.expectNextMatches(frame -> keepAliveFrame(frame) && frame.release())
240256
.expectComplete()
241257
.verify(Duration.ofSeconds(5));
242258

243259
rSocketState.rSocket.dispose();
244260
rSocketState.connection.dispose();
245261

246-
Thread.sleep(KEEP_ALIVE_INTERVAL);
247-
248262
rSocketState.allocator.assertHasNoLeaks();
249263
}
250264

251265
@Test
252-
void resumableRequesterNoKeepAlivesAfterDispose() throws InterruptedException {
266+
void resumableRequesterNoKeepAlivesAfterDispose() {
253267
ResumableRSocketState rSocketState =
254268
resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT);
255269
rSocketState.rSocket().dispose();
256-
StepVerifier.create(
257-
Flux.from(rSocketState.connection().getSentAsPublisher()).take(Duration.ofMillis(500)))
270+
Duration duration = Duration.ofMillis(500);
271+
StepVerifier.create(Flux.from(rSocketState.connection().getSentAsPublisher()).take(duration))
272+
.then(() -> virtualTimeScheduler.advanceTimeBy(duration))
258273
.expectComplete()
259274
.verify(Duration.ofSeconds(5));
260275

261276
rSocketState.rSocket.dispose();
262277
rSocketState.connection.dispose();
263278

264-
Thread.sleep(KEEP_ALIVE_INTERVAL);
265-
266279
rSocketState.allocator.assertHasNoLeaks();
267280
}
268281

@@ -273,7 +286,7 @@ void resumableRSocketsNotDisposedOnMissingKeepAlives() throws InterruptedExcepti
273286
RSocket rSocket = resumableRequesterState.rSocket();
274287
TestDuplexConnection connection = resumableRequesterState.connection();
275288

276-
Mono.delay(Duration.ofMillis(500)).block();
289+
virtualTimeScheduler.advanceTimeBy(Duration.ofMillis(500));
277290

278291
Assertions.assertThat(rSocket.isDisposed()).isFalse();
279292
Assertions.assertThat(connection.isDisposed()).isTrue();
@@ -283,8 +296,6 @@ void resumableRSocketsNotDisposedOnMissingKeepAlives() throws InterruptedExcepti
283296
resumableRequesterState.connection.dispose();
284297
resumableRequesterState.rSocket.dispose();
285298

286-
Thread.sleep(KEEP_ALIVE_INTERVAL);
287-
288299
resumableRequesterState.allocator.assertHasNoLeaks();
289300
}
290301

0 commit comments

Comments
 (0)