@@ -174,11 +174,14 @@ public void close() {
174
174
}
175
175
176
176
void init () {
177
+ LOGGER .debug ("Trying to initialize management." );
177
178
if (!this .initialized .get ()) {
178
179
try {
180
+ LOGGER .debug ("Creating management session." );
179
181
this .session = this .connection .nativeConnection ().openSession ();
180
182
String linkPairName = "management-link-pair" ;
181
183
Map <String , Object > properties = Collections .singletonMap ("paired" , Boolean .TRUE );
184
+ LOGGER .debug ("Creating management sender." );
182
185
this .sender =
183
186
session .openSender (
184
187
MANAGEMENT_NODE_ADDRESS ,
@@ -187,6 +190,7 @@ void init() {
187
190
.linkName (linkPairName )
188
191
.properties (properties ));
189
192
193
+ LOGGER .debug ("Creating management receiver." );
190
194
this .receiver =
191
195
session .openReceiver (
192
196
MANAGEMENT_NODE_ADDRESS ,
@@ -197,7 +201,9 @@ void init() {
197
201
.creditWindow (100 ));
198
202
199
203
this .sender .openFuture ().get (this .rpcTimeout .toMillis (), MILLISECONDS );
204
+ LOGGER .debug ("Management sender created." );
200
205
this .receiver .openFuture ().get (this .rpcTimeout .toMillis (), MILLISECONDS );
206
+ LOGGER .debug ("Management receiver created." );
201
207
Runnable receiveTask =
202
208
() -> {
203
209
try {
@@ -236,7 +242,9 @@ void init() {
236
242
log .accept ("Error while polling AMQP receiver" );
237
243
}
238
244
};
245
+ LOGGER .debug ("Starting management receive loop." );
239
246
this .receiveLoop = this .connection .executorService ().submit (receiveTask );
247
+ LOGGER .debug ("Management initialized." );
240
248
this .initialized .set (true );
241
249
} catch (Exception e ) {
242
250
throw new AmqpException (e );
@@ -297,7 +305,7 @@ private Response<Map<String, Object>> declare(
297
305
Message <?> request =
298
306
Message .create (body ).messageId (requestId ).to (target ).subject (operation ).replyTo (REPLY_TO );
299
307
300
- OutstandingRequest outstandingRequest = this .request (request );
308
+ OutstandingRequest outstandingRequest = this .request (request , requestId );
301
309
outstandingRequest .block ();
302
310
303
311
checkResponse (outstandingRequest , requestId , expectedResponseCodes );
@@ -307,9 +315,11 @@ private Response<Map<String, Object>> declare(
307
315
}
308
316
}
309
317
310
- OutstandingRequest request (Message <?> request ) throws ClientException {
318
+ OutstandingRequest request (Message <?> request , UUID requestId ) throws ClientException {
311
319
OutstandingRequest outstandingRequest = new OutstandingRequest (this .rpcTimeout );
312
- this .outstandingRequests .put ((UUID ) request .messageId (), outstandingRequest );
320
+ LOGGER .debug ("Enqueueing request {}" , requestId );
321
+ this .outstandingRequests .put (requestId , outstandingRequest );
322
+ LOGGER .debug ("Sending request {}" , requestId );
313
323
this .sender .send (request );
314
324
return outstandingRequest ;
315
325
}
@@ -325,7 +335,7 @@ private Map<String, Object> delete(String target, int expectedResponseCode) {
325
335
.subject (DELETE )
326
336
.replyTo (REPLY_TO );
327
337
328
- OutstandingRequest outstandingRequest = request (request );
338
+ OutstandingRequest outstandingRequest = request (request , requestId );
329
339
outstandingRequest .block ();
330
340
checkResponse (outstandingRequest , requestId , expectedResponseCode );
331
341
return outstandingRequest .responseBodyAsMap ();
@@ -436,7 +446,7 @@ private OutstandingRequest get(String target) throws ClientException {
436
446
.subject (GET )
437
447
.replyTo (REPLY_TO );
438
448
439
- OutstandingRequest outstandingRequest = request (request );
449
+ OutstandingRequest outstandingRequest = request (request , requestId );
440
450
outstandingRequest .block ();
441
451
checkResponse (outstandingRequest , requestId , CODE_200 );
442
452
return outstandingRequest ;
0 commit comments