37
37
import io .rsocket .util .DefaultPayload ;
38
38
import java .time .Duration ;
39
39
import org .assertj .core .api .Assertions ;
40
+ import org .junit .jupiter .api .AfterEach ;
41
+ import org .junit .jupiter .api .BeforeEach ;
40
42
import org .junit .jupiter .api .Test ;
41
43
import reactor .core .Disposable ;
42
44
import reactor .core .publisher .Flux ;
43
45
import reactor .core .publisher .Mono ;
44
46
import reactor .test .StepVerifier ;
47
+ import reactor .test .scheduler .VirtualTimeScheduler ;
45
48
46
49
public class KeepAliveTest {
47
50
private static final int KEEP_ALIVE_INTERVAL = 100 ;
48
51
private static final int KEEP_ALIVE_TIMEOUT = 1000 ;
49
52
private static final int RESUMABLE_KEEP_ALIVE_TIMEOUT = 200 ;
50
53
54
+ VirtualTimeScheduler virtualTimeScheduler ;
55
+
56
+ @ BeforeEach
57
+ public void setUp () {
58
+ virtualTimeScheduler = VirtualTimeScheduler .getOrSet ();
59
+ }
60
+
61
+ @ AfterEach
62
+ public void tearDown () {
63
+ VirtualTimeScheduler .reset ();
64
+ }
65
+
51
66
static RSocketState requester (int tickPeriod , int timeout ) {
52
67
LeaksTrackingByteBufAllocator allocator =
53
68
LeaksTrackingByteBufAllocator .instrument (ByteBufAllocator .DEFAULT );
@@ -95,7 +110,7 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) {
95
110
}
96
111
97
112
@ Test
98
- void rSocketNotDisposedOnPresentKeepAlives () throws InterruptedException {
113
+ void rSocketNotDisposedOnPresentKeepAlives () {
99
114
RSocketState requesterState = requester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
100
115
101
116
TestDuplexConnection connection = requesterState .connection ();
@@ -108,21 +123,19 @@ void rSocketNotDisposedOnPresentKeepAlives() throws InterruptedException {
108
123
KeepAliveFrameCodec .encode (
109
124
requesterState .allocator , true , 0 , Unpooled .EMPTY_BUFFER )));
110
125
111
- Thread . sleep ( KEEP_ALIVE_TIMEOUT * 2 );
126
+ virtualTimeScheduler . advanceTimeBy ( Duration . ofMillis ( KEEP_ALIVE_TIMEOUT * 2 ) );
112
127
113
128
RSocket rSocket = requesterState .rSocket ();
114
129
115
130
Assertions .assertThat (rSocket .isDisposed ()).isFalse ();
116
131
117
132
disposable .dispose ();
118
- Thread .sleep (KEEP_ALIVE_INTERVAL );
119
133
120
134
requesterState .connection .dispose ();
121
135
requesterState .rSocket .dispose ();
122
136
123
- Thread .sleep (KEEP_ALIVE_TIMEOUT );
124
-
125
137
Assertions .assertThat (requesterState .connection .getSent ()).allMatch (ByteBuf ::release );
138
+
126
139
requesterState .allocator .assertHasNoLeaks ();
127
140
}
128
141
@@ -132,11 +145,12 @@ void noKeepAlivesSentAfterRSocketDispose() {
132
145
133
146
requesterState .rSocket ().dispose ();
134
147
135
- StepVerifier . create (
136
- Flux .from (requesterState .connection ().getSentAsPublisher ())
137
- . take ( Duration . ofMillis ( 500 ) ))
148
+ Duration duration = Duration . ofMillis ( 500 );
149
+ StepVerifier . create ( Flux .from (requesterState .connection ().getSentAsPublisher ()). take ( duration ))
150
+ . then (() -> virtualTimeScheduler . advanceTimeBy ( duration ))
138
151
.expectComplete ()
139
152
.verify (Duration .ofSeconds (1 ));
153
+
140
154
requesterState .allocator .assertHasNoLeaks ();
141
155
}
142
156
@@ -146,7 +160,7 @@ void rSocketDisposedOnMissingKeepAlives() {
146
160
147
161
RSocket rSocket = requesterState .rSocket ();
148
162
149
- Mono . delay (Duration .ofMillis (KEEP_ALIVE_TIMEOUT * 2 )). block ( );
163
+ virtualTimeScheduler . advanceTimeBy (Duration .ofMillis (KEEP_ALIVE_TIMEOUT * 2 ));
150
164
151
165
Assertions .assertThat (rSocket .isDisposed ()).isTrue ();
152
166
rSocket
@@ -161,72 +175,73 @@ void rSocketDisposedOnMissingKeepAlives() {
161
175
}
162
176
163
177
@ Test
164
- void clientRequesterSendsKeepAlives () throws InterruptedException {
178
+ void clientRequesterSendsKeepAlives () {
165
179
RSocketState RSocketState = requester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
166
180
TestDuplexConnection connection = RSocketState .connection ();
167
181
168
182
StepVerifier .create (Flux .from (connection .getSentAsPublisher ()).take (3 ))
183
+ .then (() -> virtualTimeScheduler .advanceTimeBy (Duration .ofMillis (KEEP_ALIVE_INTERVAL )))
169
184
.expectNextMatches (this ::keepAliveFrameWithRespondFlag )
185
+ .then (() -> virtualTimeScheduler .advanceTimeBy (Duration .ofMillis (KEEP_ALIVE_INTERVAL )))
170
186
.expectNextMatches (this ::keepAliveFrameWithRespondFlag )
187
+ .then (() -> virtualTimeScheduler .advanceTimeBy (Duration .ofMillis (KEEP_ALIVE_INTERVAL )))
171
188
.expectNextMatches (this ::keepAliveFrameWithRespondFlag )
172
189
.expectComplete ()
173
190
.verify (Duration .ofSeconds (5 ));
174
191
175
192
RSocketState .rSocket .dispose ();
176
193
RSocketState .connection .dispose ();
177
194
178
- Thread .sleep (KEEP_ALIVE_INTERVAL );
179
-
180
195
RSocketState .allocator .assertHasNoLeaks ();
181
196
}
182
197
183
198
@ Test
184
- void requesterRespondsToKeepAlives () throws InterruptedException {
199
+ void requesterRespondsToKeepAlives () {
185
200
RSocketState rSocketState = requester (100_000 , 100_000 );
186
201
TestDuplexConnection connection = rSocketState .connection ();
187
- Mono .delay (Duration .ofMillis (100 ))
202
+ Duration duration = Duration .ofMillis (100 );
203
+ Mono .delay (duration )
188
204
.subscribe (
189
205
l ->
190
206
connection .addToReceivedBuffer (
191
207
KeepAliveFrameCodec .encode (
192
208
rSocketState .allocator , true , 0 , Unpooled .EMPTY_BUFFER )));
193
209
194
210
StepVerifier .create (Flux .from (connection .getSentAsPublisher ()).take (1 ))
211
+ .then (() -> virtualTimeScheduler .advanceTimeBy (duration ))
195
212
.expectNextMatches (this ::keepAliveFrameWithoutRespondFlag )
196
213
.expectComplete ()
197
214
.verify (Duration .ofSeconds (5 ));
198
215
199
216
rSocketState .rSocket .dispose ();
200
217
rSocketState .connection .dispose ();
201
218
202
- Thread .sleep (KEEP_ALIVE_TIMEOUT );
203
-
204
219
rSocketState .allocator .assertHasNoLeaks ();
205
220
}
206
221
207
222
@ Test
208
- void resumableRequesterNoKeepAlivesAfterDisconnect () throws InterruptedException {
223
+ void resumableRequesterNoKeepAlivesAfterDisconnect () {
209
224
ResumableRSocketState rSocketState =
210
225
resumableRequester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
211
226
TestDuplexConnection testConnection = rSocketState .connection ();
212
227
ResumableDuplexConnection resumableDuplexConnection = rSocketState .resumableDuplexConnection ();
213
228
214
229
resumableDuplexConnection .disconnect ();
215
230
216
- StepVerifier .create (Flux .from (testConnection .getSentAsPublisher ()).take (Duration .ofMillis (500 )))
231
+ Duration duration = Duration .ofMillis (500 );
232
+ StepVerifier .create (Flux .from (testConnection .getSentAsPublisher ()).take (duration ))
233
+ .then (() -> virtualTimeScheduler .advanceTimeBy (duration ))
217
234
.expectComplete ()
218
235
.verify (Duration .ofSeconds (5 ));
219
236
220
237
rSocketState .rSocket .dispose ();
221
238
rSocketState .connection .dispose ();
222
239
223
- Thread .sleep (KEEP_ALIVE_INTERVAL );
224
-
225
240
rSocketState .allocator .assertHasNoLeaks ();
226
241
}
227
242
228
243
@ Test
229
- void resumableRequesterKeepAlivesAfterReconnect () throws InterruptedException {
244
+ void resumableRequesterKeepAlivesAfterReconnect () {
230
245
ResumableRSocketState rSocketState =
231
246
resumableRequester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
232
247
ResumableDuplexConnection resumableDuplexConnection = rSocketState .resumableDuplexConnection ();
@@ -236,33 +251,31 @@ void resumableRequesterKeepAlivesAfterReconnect() throws InterruptedException {
236
251
resumableDuplexConnection .resume (0 , 0 , ignored -> Mono .empty ());
237
252
238
253
StepVerifier .create (Flux .from (newTestConnection .getSentAsPublisher ()).take (1 ))
254
+ .then (() -> virtualTimeScheduler .advanceTimeBy (Duration .ofMillis (KEEP_ALIVE_INTERVAL )))
239
255
.expectNextMatches (frame -> keepAliveFrame (frame ) && frame .release ())
240
256
.expectComplete ()
241
257
.verify (Duration .ofSeconds (5 ));
242
258
243
259
rSocketState .rSocket .dispose ();
244
260
rSocketState .connection .dispose ();
245
261
246
- Thread .sleep (KEEP_ALIVE_INTERVAL );
247
-
248
262
rSocketState .allocator .assertHasNoLeaks ();
249
263
}
250
264
251
265
@ Test
252
- void resumableRequesterNoKeepAlivesAfterDispose () throws InterruptedException {
266
+ void resumableRequesterNoKeepAlivesAfterDispose () {
253
267
ResumableRSocketState rSocketState =
254
268
resumableRequester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
255
269
rSocketState .rSocket ().dispose ();
256
- StepVerifier .create (
257
- Flux .from (rSocketState .connection ().getSentAsPublisher ()).take (Duration .ofMillis (500 )))
270
+ Duration duration = Duration .ofMillis (500 );
271
+ StepVerifier .create (Flux .from (rSocketState .connection ().getSentAsPublisher ()).take (duration ))
272
+ .then (() -> virtualTimeScheduler .advanceTimeBy (duration ))
258
273
.expectComplete ()
259
274
.verify (Duration .ofSeconds (5 ));
260
275
261
276
rSocketState .rSocket .dispose ();
262
277
rSocketState .connection .dispose ();
263
278
264
- Thread .sleep (KEEP_ALIVE_INTERVAL );
265
-
266
279
rSocketState .allocator .assertHasNoLeaks ();
267
280
}
268
281
@@ -273,7 +286,7 @@ void resumableRSocketsNotDisposedOnMissingKeepAlives() throws InterruptedExcepti
273
286
RSocket rSocket = resumableRequesterState .rSocket ();
274
287
TestDuplexConnection connection = resumableRequesterState .connection ();
275
288
276
- Mono . delay (Duration .ofMillis (500 )). block ( );
289
+ virtualTimeScheduler . advanceTimeBy (Duration .ofMillis (500 ));
277
290
278
291
Assertions .assertThat (rSocket .isDisposed ()).isFalse ();
279
292
Assertions .assertThat (connection .isDisposed ()).isTrue ();
@@ -283,8 +296,6 @@ void resumableRSocketsNotDisposedOnMissingKeepAlives() throws InterruptedExcepti
283
296
resumableRequesterState .connection .dispose ();
284
297
resumableRequesterState .rSocket .dispose ();
285
298
286
- Thread .sleep (KEEP_ALIVE_INTERVAL );
287
-
288
299
resumableRequesterState .allocator .assertHasNoLeaks ();
289
300
}
290
301
0 commit comments