37
37
import io .rsocket .util .DefaultPayload ;
38
38
import java .time .Duration ;
39
39
import org .assertj .core .api .Assertions ;
40
- import org .junit .jupiter .api .BeforeEach ;
41
40
import org .junit .jupiter .api .Test ;
41
+ import reactor .core .Disposable ;
42
42
import reactor .core .publisher .Flux ;
43
43
import reactor .core .publisher .Mono ;
44
44
import reactor .test .StepVerifier ;
@@ -48,9 +48,6 @@ public class KeepAliveTest {
48
48
private static final int KEEP_ALIVE_TIMEOUT = 1000 ;
49
49
private static final int RESUMABLE_KEEP_ALIVE_TIMEOUT = 200 ;
50
50
51
- private RSocketState requesterState ;
52
- private ResumableRSocketState resumableRequesterState ;
53
-
54
51
static RSocketState requester (int tickPeriod , int timeout ) {
55
52
LeaksTrackingByteBufAllocator allocator =
56
53
LeaksTrackingByteBufAllocator .instrument (ByteBufAllocator .DEFAULT );
@@ -97,57 +94,75 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) {
97
94
return new ResumableRSocketState (rSocket , connection , resumableConnection , allocator );
98
95
}
99
96
100
- @ BeforeEach
101
- void setUp () {
102
- requesterState = requester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
103
- resumableRequesterState = resumableRequester (KEEP_ALIVE_INTERVAL , RESUMABLE_KEEP_ALIVE_TIMEOUT );
104
- }
105
-
106
97
@ Test
107
- void rSocketNotDisposedOnPresentKeepAlives () {
98
+ void rSocketNotDisposedOnPresentKeepAlives () throws InterruptedException {
99
+ RSocketState requesterState = requester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
100
+
108
101
TestDuplexConnection connection = requesterState .connection ();
109
102
110
- Flux .interval (Duration .ofMillis (100 ))
111
- .subscribe (
112
- n ->
113
- connection .addToReceivedBuffer (
114
- KeepAliveFrameCodec .encode (
115
- ByteBufAllocator .DEFAULT , true , 0 , Unpooled .EMPTY_BUFFER )));
103
+ Disposable disposable =
104
+ Flux .interval (Duration .ofMillis (KEEP_ALIVE_INTERVAL ))
105
+ .subscribe (
106
+ n ->
107
+ connection .addToReceivedBuffer (
108
+ KeepAliveFrameCodec .encode (
109
+ requesterState .allocator , true , 0 , Unpooled .EMPTY_BUFFER )));
116
110
117
- Mono . delay ( Duration . ofMillis ( 2000 )). block ( );
111
+ Thread . sleep ( KEEP_ALIVE_TIMEOUT * 2 );
118
112
119
113
RSocket rSocket = requesterState .rSocket ();
120
114
121
115
Assertions .assertThat (rSocket .isDisposed ()).isFalse ();
116
+
117
+ disposable .dispose ();
118
+ Thread .sleep (KEEP_ALIVE_INTERVAL );
119
+
120
+ requesterState .connection .dispose ();
121
+ requesterState .rSocket .dispose ();
122
+
123
+ Thread .sleep (KEEP_ALIVE_TIMEOUT );
124
+
125
+ Assertions .assertThat (requesterState .connection .getSent ()).allMatch (ByteBuf ::release );
126
+ requesterState .allocator .assertHasNoLeaks ();
122
127
}
123
128
124
129
@ Test
125
130
void noKeepAlivesSentAfterRSocketDispose () {
131
+ RSocketState requesterState = requester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
132
+
126
133
requesterState .rSocket ().dispose ();
134
+
127
135
StepVerifier .create (
128
136
Flux .from (requesterState .connection ().getSentAsPublisher ())
129
137
.take (Duration .ofMillis (500 )))
130
138
.expectComplete ()
131
139
.verify (Duration .ofSeconds (1 ));
140
+ requesterState .allocator .assertHasNoLeaks ();
132
141
}
133
142
134
143
@ Test
135
144
void rSocketDisposedOnMissingKeepAlives () {
145
+ RSocketState requesterState = requester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
146
+
136
147
RSocket rSocket = requesterState .rSocket ();
137
148
138
- Mono .delay (Duration .ofMillis (2000 )).block ();
149
+ Mono .delay (Duration .ofMillis (KEEP_ALIVE_TIMEOUT * 2 )).block ();
139
150
140
151
Assertions .assertThat (rSocket .isDisposed ()).isTrue ();
141
152
rSocket
142
153
.onClose ()
143
154
.as (StepVerifier ::create )
144
155
.expectError (ConnectionErrorException .class )
145
156
.verify (Duration .ofMillis (100 ));
157
+
158
+ Assertions .assertThat (requesterState .connection .getSent ()).allMatch (ByteBuf ::release );
159
+
160
+ requesterState .allocator .assertHasNoLeaks ();
146
161
}
147
162
148
163
@ Test
149
- void clientRequesterSendsKeepAlives () {
150
- RSocketState RSocketState = requester (100 , 1000 );
164
+ void clientRequesterSendsKeepAlives () throws InterruptedException {
165
+ RSocketState RSocketState = requester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
151
166
TestDuplexConnection connection = RSocketState .connection ();
152
167
153
168
StepVerifier .create (Flux .from (connection .getSentAsPublisher ()).take (3 ))
@@ -156,27 +171,41 @@ void clientRequesterSendsKeepAlives() {
156
171
.expectNextMatches (this ::keepAliveFrameWithRespondFlag )
157
172
.expectComplete ()
158
173
.verify (Duration .ofSeconds (5 ));
174
+
175
+ RSocketState .rSocket .dispose ();
176
+ RSocketState .connection .dispose ();
177
+
178
+ Thread .sleep (KEEP_ALIVE_INTERVAL );
179
+
180
+ RSocketState .allocator .assertHasNoLeaks ();
159
181
}
160
182
161
183
@ Test
162
- void requesterRespondsToKeepAlives () {
163
- RSocketState RSocketState = requester (100_000 , 100_000 );
164
- TestDuplexConnection connection = RSocketState .connection ();
184
+ void requesterRespondsToKeepAlives () throws InterruptedException {
185
+ RSocketState rSocketState = requester (100_000 , 100_000 );
186
+ TestDuplexConnection connection = rSocketState .connection ();
165
187
Mono .delay (Duration .ofMillis (100 ))
166
188
.subscribe (
167
189
l ->
168
190
connection .addToReceivedBuffer (
169
191
KeepAliveFrameCodec .encode (
170
- ByteBufAllocator . DEFAULT , true , 0 , Unpooled .EMPTY_BUFFER )));
192
+ rSocketState . allocator , true , 0 , Unpooled .EMPTY_BUFFER )));
171
193
172
194
StepVerifier .create (Flux .from (connection .getSentAsPublisher ()).take (1 ))
173
195
.expectNextMatches (this ::keepAliveFrameWithoutRespondFlag )
174
196
.expectComplete ()
175
197
.verify (Duration .ofSeconds (5 ));
198
+
199
+ rSocketState .rSocket .dispose ();
200
+ rSocketState .connection .dispose ();
201
+
202
+ Thread .sleep (KEEP_ALIVE_TIMEOUT );
203
+
204
+ rSocketState .allocator .assertHasNoLeaks ();
176
205
}
177
206
178
207
@ Test
179
- void resumableRequesterNoKeepAlivesAfterDisconnect () {
208
+ void resumableRequesterNoKeepAlivesAfterDisconnect () throws InterruptedException {
180
209
ResumableRSocketState rSocketState =
181
210
resumableRequester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
182
211
TestDuplexConnection testConnection = rSocketState .connection ();
@@ -187,10 +216,17 @@ void resumableRequesterNoKeepAlivesAfterDisconnect() {
187
216
StepVerifier .create (Flux .from (testConnection .getSentAsPublisher ()).take (Duration .ofMillis (500 )))
188
217
.expectComplete ()
189
218
.verify (Duration .ofSeconds (5 ));
219
+
220
+ rSocketState .rSocket .dispose ();
221
+ rSocketState .connection .dispose ();
222
+
223
+ Thread .sleep (KEEP_ALIVE_INTERVAL );
224
+
225
+ rSocketState .allocator .assertHasNoLeaks ();
190
226
}
191
227
192
228
@ Test
193
- void resumableRequesterKeepAlivesAfterReconnect () {
229
+ void resumableRequesterKeepAlivesAfterReconnect () throws InterruptedException {
194
230
ResumableRSocketState rSocketState =
195
231
resumableRequester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
196
232
ResumableDuplexConnection resumableDuplexConnection = rSocketState .resumableDuplexConnection ();
@@ -200,43 +236,68 @@ void resumableRequesterKeepAlivesAfterReconnect() {
200
236
resumableDuplexConnection .resume (0 , 0 , ignored -> Mono .empty ());
201
237
202
238
StepVerifier .create (Flux .from (newTestConnection .getSentAsPublisher ()).take (1 ))
203
- .expectNextMatches (this :: keepAliveFrame )
239
+ .expectNextMatches (frame -> keepAliveFrame ( frame ) && frame . release () )
204
240
.expectComplete ()
205
241
.verify (Duration .ofSeconds (5 ));
242
+
243
+ rSocketState .rSocket .dispose ();
244
+ rSocketState .connection .dispose ();
245
+
246
+ Thread .sleep (KEEP_ALIVE_INTERVAL );
247
+
248
+ rSocketState .allocator .assertHasNoLeaks ();
206
249
}
207
250
208
251
@ Test
209
- void resumableRequesterNoKeepAlivesAfterDispose () {
252
+ void resumableRequesterNoKeepAlivesAfterDispose () throws InterruptedException {
210
253
ResumableRSocketState rSocketState =
211
254
resumableRequester (KEEP_ALIVE_INTERVAL , KEEP_ALIVE_TIMEOUT );
212
255
rSocketState .rSocket ().dispose ();
213
256
StepVerifier .create (
214
257
Flux .from (rSocketState .connection ().getSentAsPublisher ()).take (Duration .ofMillis (500 )))
215
258
.expectComplete ()
216
259
.verify (Duration .ofSeconds (5 ));
260
+
261
+ rSocketState .rSocket .dispose ();
262
+ rSocketState .connection .dispose ();
263
+
264
+ Thread .sleep (KEEP_ALIVE_INTERVAL );
265
+
266
+ rSocketState .allocator .assertHasNoLeaks ();
217
267
}
218
268
219
269
@ Test
220
- void resumableRSocketsNotDisposedOnMissingKeepAlives () {
270
+ void resumableRSocketsNotDisposedOnMissingKeepAlives () throws InterruptedException {
271
+ ResumableRSocketState resumableRequesterState =
272
+ resumableRequester (KEEP_ALIVE_INTERVAL , RESUMABLE_KEEP_ALIVE_TIMEOUT );
221
273
RSocket rSocket = resumableRequesterState .rSocket ();
222
274
TestDuplexConnection connection = resumableRequesterState .connection ();
223
275
224
276
Mono .delay (Duration .ofMillis (500 )).block ();
225
277
226
278
Assertions .assertThat (rSocket .isDisposed ()).isFalse ();
227
279
Assertions .assertThat (connection .isDisposed ()).isTrue ();
280
+
281
+ Assertions .assertThat (resumableRequesterState .connection .getSent ()).allMatch (ByteBuf ::release );
282
+
283
+ resumableRequesterState .connection .dispose ();
284
+ resumableRequesterState .rSocket .dispose ();
285
+
286
+ Thread .sleep (KEEP_ALIVE_INTERVAL );
287
+
288
+ resumableRequesterState .allocator .assertHasNoLeaks ();
228
289
}
229
290
230
291
private boolean keepAliveFrame (ByteBuf frame ) {
231
292
return FrameHeaderCodec .frameType (frame ) == FrameType .KEEPALIVE ;
232
293
}
233
294
234
295
private boolean keepAliveFrameWithRespondFlag (ByteBuf frame ) {
235
- return keepAliveFrame (frame ) && KeepAliveFrameCodec .respondFlag (frame );
296
+ return keepAliveFrame (frame ) && KeepAliveFrameCodec .respondFlag (frame ) && frame . release () ;
236
297
}
237
298
238
299
private boolean keepAliveFrameWithoutRespondFlag (ByteBuf frame ) {
239
- return keepAliveFrame (frame ) && !KeepAliveFrameCodec .respondFlag (frame );
300
+ return keepAliveFrame (frame ) && !KeepAliveFrameCodec .respondFlag (frame ) && frame . release () ;
240
301
}
241
302
242
303
static class RSocketState {
0 commit comments