@@ -11,81 +11,30 @@ namespace RabbitMQ.AMQP.Client.Impl;
11
11
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
12
12
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
13
13
/// </summary>
14
- public class AmqpConnection : AbstractLifeCycle , IConnection
14
+ public class AmqpConnection : AbstractLifeCycle , IConnection , IDisposable
15
15
{
16
16
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED" ;
17
17
private const string ConnectionNotRecoveredMessage = "Connection not recovered" ;
18
- private readonly SemaphoreSlim _semaphoreClose = new ( 1 , 1 ) ; // TODO this needs to be Disposed
18
+
19
+ private readonly SemaphoreSlim _semaphoreClose = new ( 1 , 1 ) ;
20
+ private readonly SemaphoreSlim _semaphoreOpen = new ( 1 , 1 ) ;
19
21
20
22
// The native AMQP.Net Lite connection
21
23
private Connection ? _nativeConnection ;
22
24
23
25
private readonly AmqpManagement _management ;
24
26
private readonly RecordingTopologyListener _recordingTopologyListener = new ( ) ;
25
27
26
- private void ChangeEntitiesStatus ( State state , Error ? error )
27
- {
28
- ChangePublishersStatus ( state , error ) ;
29
- ChangeConsumersStatus ( state , error ) ;
30
- _management . ChangeStatus ( state , error ) ;
31
- }
32
-
33
- private void ChangePublishersStatus ( State state , Error ? error )
34
- {
35
- foreach ( IPublisher publisher1 in Publishers . Values )
36
- {
37
- var publisher = ( AmqpPublisher ) publisher1 ;
38
- publisher . ChangeStatus ( state , error ) ;
39
- }
40
- }
41
-
42
- private void ChangeConsumersStatus ( State state , Error ? error )
43
- {
44
- foreach ( IConsumer consumer1 in Consumers . Values )
45
- {
46
- var consumer = ( AmqpConsumer ) consumer1 ;
47
- consumer . ChangeStatus ( state , error ) ;
48
- }
49
- }
50
-
51
- private async Task ReconnectEntities ( )
52
- {
53
- await ReconnectPublishers ( ) . ConfigureAwait ( false ) ;
54
- await ReconnectConsumers ( ) . ConfigureAwait ( false ) ;
55
- }
56
-
57
- private async Task ReconnectPublishers ( )
58
- {
59
- foreach ( IPublisher publisher1 in Publishers . Values )
60
- {
61
- var publisher = ( AmqpPublisher ) publisher1 ;
62
- await publisher . Reconnect ( ) . ConfigureAwait ( false ) ;
63
- }
64
- }
65
-
66
- private async Task ReconnectConsumers ( )
67
- {
68
- foreach ( IConsumer consumer1 in Consumers . Values )
69
- {
70
- var consumer = ( AmqpConsumer ) consumer1 ;
71
- await consumer . Reconnect ( ) . ConfigureAwait ( false ) ;
72
- }
73
- }
74
-
75
28
private readonly IConnectionSettings _connectionSettings ;
76
29
internal readonly AmqpSessionManagement _nativePubSubSessions ;
77
30
78
- // TODO: Implement the semaphore to avoid multiple connections
79
- // private readonly SemaphoreSlim _semaphore = new(1, 1);
80
-
81
31
/// <summary>
82
32
/// Publishers contains all the publishers created by the connection.
83
33
/// Each connection can have multiple publishers.
84
34
/// They key is the publisher Id ( a Guid)
85
35
/// See <see cref="AmqpPublisher"/>
86
36
/// </summary>
87
37
internal ConcurrentDictionary < string , IPublisher > Publishers { get ; } = new ( ) ;
88
-
89
38
internal ConcurrentDictionary < string , IConsumer > Consumers { get ; } = new ( ) ;
90
39
91
40
public ReadOnlyCollection < IPublisher > GetPublishers ( )
@@ -116,6 +65,87 @@ await connection.OpenAsync()
116
65
return connection ;
117
66
}
118
67
68
+ public IManagement Management ( )
69
+ {
70
+ return _management ;
71
+ }
72
+
73
+ public IConsumerBuilder ConsumerBuilder ( )
74
+ {
75
+ return new AmqpConsumerBuilder ( this ) ;
76
+ }
77
+
78
+ public override async Task OpenAsync ( )
79
+ {
80
+ await OpenConnectionAsync ( )
81
+ . ConfigureAwait ( false ) ;
82
+ await base . OpenAsync ( )
83
+ . ConfigureAwait ( false ) ;
84
+ }
85
+
86
+ public IPublisherBuilder PublisherBuilder ( )
87
+ {
88
+ ThrowIfClosed ( ) ;
89
+ var publisherBuilder = new AmqpPublisherBuilder ( this ) ;
90
+ return publisherBuilder ;
91
+ }
92
+
93
+ public override async Task CloseAsync ( )
94
+ {
95
+ await _semaphoreClose . WaitAsync ( )
96
+ . ConfigureAwait ( false ) ;
97
+ try
98
+ {
99
+ await CloseAllPublishers ( ) . ConfigureAwait ( false ) ;
100
+ await CloseAllConsumers ( ) . ConfigureAwait ( false ) ;
101
+
102
+ _recordingTopologyListener . Clear ( ) ;
103
+ _nativePubSubSessions . ClearSessions ( ) ;
104
+
105
+ if ( State == State . Closed )
106
+ {
107
+ return ;
108
+ }
109
+
110
+ OnNewStatus ( State . Closing , null ) ;
111
+
112
+ await _management . CloseAsync ( )
113
+ . ConfigureAwait ( false ) ;
114
+
115
+ if ( _nativeConnection is { IsClosed : false } )
116
+ {
117
+ await _nativeConnection . CloseAsync ( )
118
+ . ConfigureAwait ( false ) ;
119
+ }
120
+ }
121
+ finally
122
+ {
123
+ _semaphoreClose . Release ( ) ;
124
+ }
125
+
126
+ await ConnectionCloseTaskCompletionSource . Task . WaitAsync ( TimeSpan . FromSeconds ( 10 ) )
127
+ . ConfigureAwait ( false ) ;
128
+
129
+ OnNewStatus ( State . Closed , null ) ;
130
+ }
131
+
132
+ public void Dispose ( )
133
+ {
134
+ // TODO probably more should happen in this method
135
+ _semaphoreOpen . Dispose ( ) ;
136
+ _semaphoreClose . Dispose ( ) ;
137
+ }
138
+
139
+ public override string ToString ( )
140
+ {
141
+ string info = $ "AmqpConnection{{ConnectionSettings='{ _connectionSettings } ', Status='{ State . ToString ( ) } '}}";
142
+ return info ;
143
+ }
144
+
145
+ internal Connection ? NativeConnection ( )
146
+ {
147
+ return _nativeConnection ;
148
+ }
119
149
120
150
/// <summary>
121
151
/// Closes all the publishers. It is called when the connection is closed.
@@ -150,28 +180,11 @@ private AmqpConnection(IConnectionSettings connectionSettings)
150
180
new AmqpManagement ( new AmqpManagementParameters ( this ) . TopologyListener ( _recordingTopologyListener ) ) ;
151
181
}
152
182
153
- public IManagement Management ( )
154
- {
155
- return _management ;
156
- }
157
-
158
- public IConsumerBuilder ConsumerBuilder ( )
183
+ // TODO cancellation token
184
+ private async Task OpenConnectionAsync ( )
159
185
{
160
- return new AmqpConsumerBuilder ( this ) ;
161
- }
162
-
163
- public override async Task OpenAsync ( )
164
- {
165
- await EnsureConnection ( )
186
+ await _semaphoreOpen . WaitAsync ( )
166
187
. ConfigureAwait ( false ) ;
167
- await base . OpenAsync ( )
168
- . ConfigureAwait ( false ) ;
169
- }
170
-
171
- private async Task EnsureConnection ( )
172
- {
173
- // TODO: do this!
174
- // await _semaphore.WaitAsync();
175
188
try
176
189
{
177
190
if ( _nativeConnection is { IsClosed : false } )
@@ -252,27 +265,34 @@ await _management.OpenAsync()
252
265
253
266
_nativeConnection . Closed += MaybeRecoverConnection ( ) ;
254
267
}
255
-
256
268
catch ( AmqpException e )
257
269
{
258
270
Trace . WriteLine ( TraceLevel . Error , $ "Error trying to connect. Info: { ToString ( ) } , error: { e } ") ;
259
271
throw new ConnectionException ( $ "Error trying to connect. Info: { ToString ( ) } , error: { e } ") ;
260
272
}
273
+ finally
274
+ {
275
+ _semaphoreOpen . Release ( ) ;
276
+ }
261
277
}
262
278
263
279
/// <summary>
264
280
/// Event handler for the connection closed event.
265
281
/// In case the error is null means that the connection is closed by the user.
266
282
/// The recover mechanism is activated only if the error is not null.
267
283
/// The connection maybe recovered if the recovery configuration is active.
284
+ ///
285
+ /// TODO this method could be improved.
286
+ /// MaybeRecoverConnection should set a connection state to RECOVERING
287
+ /// and then kick off a task dedicated to recovery
268
288
/// </summary>
269
289
/// <returns></returns>
270
290
private ClosedCallback MaybeRecoverConnection ( )
271
291
{
272
292
return async ( sender , error ) =>
273
293
{
274
- await _semaphoreClose . WaitAsync ( ) . ConfigureAwait ( false ) ;
275
-
294
+ await _semaphoreClose . WaitAsync ( )
295
+ . ConfigureAwait ( false ) ;
276
296
try
277
297
{
278
298
// close all the sessions, if the connection is closed the sessions are not valid anymore
@@ -299,7 +319,6 @@ private ClosedCallback MaybeRecoverConnection()
299
319
OnNewStatus ( State . Reconnecting , Utils . ConvertError ( error ) ) ;
300
320
ChangeEntitiesStatus ( State . Reconnecting , Utils . ConvertError ( error ) ) ;
301
321
302
-
303
322
await Task . Run ( async ( ) =>
304
323
{
305
324
bool connected = false ;
@@ -328,7 +347,7 @@ await Task.Run(async () =>
328
347
await Task . Delay ( TimeSpan . FromMilliseconds ( next ) )
329
348
. ConfigureAwait ( false ) ;
330
349
331
- await EnsureConnection ( )
350
+ await OpenConnectionAsync ( )
332
351
. ConfigureAwait ( false ) ;
333
352
connected = true ;
334
353
}
@@ -370,7 +389,7 @@ await _recordingTopologyListener.Accept(visitor)
370
389
371
390
try
372
391
{
373
- await ReconnectEntities ( ) . ConfigureAwait ( false ) ;
392
+ await ReconnectEntitiesAsync ( ) . ConfigureAwait ( false ) ;
374
393
}
375
394
catch ( Exception e )
376
395
{
@@ -394,70 +413,68 @@ await _recordingTopologyListener.Accept(visitor)
394
413
} ;
395
414
}
396
415
397
- internal Connection ? NativeConnection ( )
416
+ private void ChangeEntitiesStatus ( State state , Error ? error )
398
417
{
399
- return _nativeConnection ;
418
+ ChangePublishersStatus ( state , error ) ;
419
+ ChangeConsumersStatus ( state , error ) ;
420
+ _management . ChangeStatus ( state , error ) ;
400
421
}
401
422
402
- public IPublisherBuilder PublisherBuilder ( )
423
+ private void ChangePublishersStatus ( State state , Error ? error )
403
424
{
404
- ThrowIfClosed ( ) ;
405
- var publisherBuilder = new AmqpPublisherBuilder ( this ) ;
406
- return publisherBuilder ;
425
+ foreach ( IPublisher publisher1 in Publishers . Values )
426
+ {
427
+ var publisher = ( AmqpPublisher ) publisher1 ;
428
+ publisher . ChangeStatus ( state , error ) ;
429
+ }
407
430
}
408
431
409
- public override async Task CloseAsync ( )
432
+ private void ChangeConsumersStatus ( State state , Error ? error )
410
433
{
411
- await _semaphoreClose . WaitAsync ( )
412
- . ConfigureAwait ( false ) ;
413
- try
414
- {
415
- await CloseAllPublishers ( ) . ConfigureAwait ( false ) ;
416
- await CloseAllConsumers ( ) . ConfigureAwait ( false ) ;
417
-
418
- _recordingTopologyListener . Clear ( ) ;
419
- _nativePubSubSessions . ClearSessions ( ) ;
420
-
421
- if ( State == State . Closed )
422
- {
423
- return ;
424
- }
425
-
426
- OnNewStatus ( State . Closing , null ) ;
427
-
428
- await _management . CloseAsync ( )
429
- . ConfigureAwait ( false ) ;
430
-
431
- if ( _nativeConnection is { IsClosed : false } )
432
- {
433
- await _nativeConnection . CloseAsync ( )
434
- . ConfigureAwait ( false ) ;
435
- }
436
- }
437
- finally
434
+ foreach ( IConsumer consumer1 in Consumers . Values )
438
435
{
439
- _semaphoreClose . Release ( ) ;
436
+ var consumer = ( AmqpConsumer ) consumer1 ;
437
+ consumer . ChangeStatus ( state , error ) ;
440
438
}
439
+ }
441
440
442
- await ConnectionCloseTaskCompletionSource . Task . WaitAsync ( TimeSpan . FromSeconds ( 10 ) )
441
+ private async Task ReconnectEntitiesAsync ( )
442
+ {
443
+ await ReconnectPublishersAsync ( )
443
444
. ConfigureAwait ( false ) ;
445
+ await ReconnectConsumersAsync ( )
446
+ . ConfigureAwait ( false ) ;
447
+ }
444
448
445
- OnNewStatus ( State . Closed , null ) ;
449
+ private async Task ReconnectPublishersAsync ( )
450
+ {
451
+ // TODO this could be done in parallel
452
+ foreach ( IPublisher publisher1 in Publishers . Values )
453
+ {
454
+ var publisher = ( AmqpPublisher ) publisher1 ;
455
+ await publisher . ReconnectAsync ( )
456
+ . ConfigureAwait ( false ) ;
457
+ }
446
458
}
447
459
448
- public override string ToString ( )
460
+ private async Task ReconnectConsumersAsync ( )
449
461
{
450
- string info = $ "AmqpConnection{{ConnectionSettings='{ _connectionSettings } ', Status='{ State . ToString ( ) } '}}";
451
- return info ;
462
+ // TODO this could be done in parallel
463
+ foreach ( IConsumer consumer1 in Consumers . Values )
464
+ {
465
+ var consumer = ( AmqpConsumer ) consumer1 ;
466
+ await consumer . ReconnectAsync ( ) . ConfigureAwait ( false ) ;
467
+ }
452
468
}
453
469
}
454
470
455
471
internal class Visitor ( AmqpManagement management ) : IVisitor
456
472
{
457
473
private AmqpManagement Management { get ; } = management ;
458
474
459
- public async Task VisitQueues ( IEnumerable < QueueSpec > queueSpec )
475
+ public async Task VisitQueuesAsync ( IEnumerable < QueueSpec > queueSpec )
460
476
{
477
+ // TODO this could be done in parallel
461
478
foreach ( QueueSpec spec in queueSpec )
462
479
{
463
480
Trace . WriteLine ( TraceLevel . Information , $ "Recovering queue { spec . Name } ") ;
0 commit comments