@@ -85,8 +85,47 @@ void validTokenShouldSucceed() {
85
85
}
86
86
87
87
@ Test
88
- @ BrokerVersionAtLeast (RABBITMQ_4_1_0 )
89
88
void connectionShouldBeClosedWhenTokenExpires (TestInfo info ) {
89
+ String q = name (info );
90
+ long expiry = currentTimeMillis () + ofSeconds (2 ).toMillis ();
91
+ String token = token (expiry );
92
+ Sync connectionClosedSync = sync ();
93
+ Connection c =
94
+ environment
95
+ .connectionBuilder ()
96
+ .username ("" )
97
+ .password (token )
98
+ .listeners (closedOnSecurityExceptionListener (connectionClosedSync ))
99
+ .build ();
100
+ c .management ().queue (q ).exclusive (true ).declare ();
101
+ Sync publisherClosedSync = sync ();
102
+ Publisher p =
103
+ c .publisherBuilder ()
104
+ .queue (q )
105
+ .listeners (closedOnSecurityExceptionListener (publisherClosedSync ))
106
+ .build ();
107
+ Sync consumeSync = sync ();
108
+ Sync consumerClosedSync = sync ();
109
+ c .consumerBuilder ()
110
+ .queue (q )
111
+ .messageHandler (
112
+ (ctx , msg ) -> {
113
+ ctx .accept ();
114
+ consumeSync .down ();
115
+ })
116
+ .listeners (closedOnSecurityExceptionListener (consumerClosedSync ))
117
+ .build ();
118
+ p .publish (p .message (), ctx -> {});
119
+ assertThat (consumeSync ).completes ();
120
+ waitAtMost (() -> currentTimeMillis () > expiry + ofMillis (500 ).toMillis ());
121
+ assertThat (connectionClosedSync ).completes ();
122
+ assertThat (publisherClosedSync ).completes ();
123
+ assertThat (consumerClosedSync ).completes ();
124
+ }
125
+
126
+ @ Test
127
+ @ BrokerVersionAtLeast (RABBITMQ_4_1_0 )
128
+ void connectionShouldBeClosedWhenRefreshedTokenExpires (TestInfo info ) {
90
129
String q = name (info );
91
130
long expiry = currentTimeMillis () + ofSeconds (2 ).toMillis ();
92
131
String token = token (expiry );
@@ -170,9 +209,10 @@ void tokenShouldBeRefreshedAutomatically(boolean shared, TestInfo info) throws E
170
209
int expectedRefreshCount = shared ? refreshRounds : refreshRounds * connectionCount ;
171
210
Sync tokenRequestSync = sync (expectedRefreshCount );
172
211
AtomicInteger refreshCount = new AtomicInteger ();
212
+ Duration tokenLifetime = ofSeconds (3 );
173
213
HttpHandler httpHandler =
174
214
oAuth2TokenHttpHandler (
175
- () -> currentTimeMillis () + 3_000 ,
215
+ () -> currentTimeMillis () + tokenLifetime . toMillis () ,
176
216
() -> {
177
217
refreshCount .incrementAndGet ();
178
218
tokenRequestSync .down ();
@@ -223,21 +263,25 @@ void tokenShouldBeRefreshedAutomatically(boolean shared, TestInfo info) throws E
223
263
expectMessages .run ();
224
264
225
265
assertThat (tokenRequestSync ).completes ();
266
+ Thread .sleep (tokenLifetime .toMillis ());
226
267
227
268
publish .run ();
228
269
expectMessages .run ();
229
270
}
230
271
}
231
272
232
273
@ Test
274
+ @ BrokerVersionAtLeast (RABBITMQ_4_1_0 )
233
275
void tokenOnHttpsShouldBeRefreshed (TestInfo info ) throws Exception {
234
276
KeyStore keyStore = generateKeyPair ();
235
277
236
278
Sync tokenRefreshedSync = sync (3 );
237
279
int port = randomNetworkPort ();
238
280
String contextPath = "/uaa/oauth/token" ;
281
+ Duration tokenLifetime = ofSeconds (3 );
239
282
HttpHandler httpHandler =
240
- oAuth2TokenHttpHandler (() -> currentTimeMillis () + 3_000 , tokenRefreshedSync ::down );
283
+ oAuth2TokenHttpHandler (
284
+ () -> currentTimeMillis () + tokenLifetime .toMillis (), tokenRefreshedSync ::down );
241
285
this .server = startServer (port , contextPath , keyStore , httpHandler );
242
286
243
287
SSLContext sslContext = SSLContext .getInstance ("TLS" );
@@ -278,8 +322,10 @@ void tokenOnHttpsShouldBeRefreshed(TestInfo info) throws Exception {
278
322
assertThat (consumeSync ).completes ();
279
323
280
324
assertThat (tokenRefreshedSync ).completes ();
325
+ Thread .sleep (tokenLifetime .toMillis ());
281
326
282
327
consumeSync .reset ();
328
+
283
329
publisher .publish (publisher .message (), ctx -> {});
284
330
assertThat (consumeSync ).completes ();
285
331
}
0 commit comments