@@ -39,7 +39,6 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
39
39
private const string ConnectionNotRecoveredMessage = "Connection not recovered" ;
40
40
private readonly SemaphoreSlim _semaphoreClose = new ( 1 , 1 ) ;
41
41
42
-
43
42
// The native AMQP.Net Lite connection
44
43
private Connection ? _nativeConnection ;
45
44
@@ -71,7 +70,6 @@ private void ChangeConsumersStatus(State state, Error? error)
71
70
}
72
71
}
73
72
74
-
75
73
private async Task ReconnectEntities ( )
76
74
{
77
75
await ReconnectPublishers ( ) . ConfigureAwait ( false ) ;
@@ -102,7 +100,6 @@ private async Task ReconnectConsumers()
102
100
// TODO: Implement the semaphore to avoid multiple connections
103
101
// private readonly SemaphoreSlim _semaphore = new(1, 1);
104
102
105
-
106
103
/// <summary>
107
104
/// Publishers contains all the publishers created by the connection.
108
105
/// Each connection can have multiple publishers.
@@ -113,7 +110,6 @@ private async Task ReconnectConsumers()
113
110
114
111
internal ConcurrentDictionary < string , IConsumer > Consumers { get ; } = new ( ) ;
115
112
116
-
117
113
public ReadOnlyCollection < IPublisher > GetPublishers ( )
118
114
{
119
115
return Publishers . Values . ToList ( ) . AsReadOnly ( ) ;
@@ -179,14 +175,18 @@ public IConsumerBuilder ConsumerBuilder()
179
175
return new AmqpConsumerBuilder ( this ) ;
180
176
}
181
177
182
- protected override Task OpenAsync ( )
178
+ protected override async Task OpenAsync ( )
183
179
{
184
- EnsureConnection ( ) ;
185
- return base . OpenAsync ( ) ;
180
+ await EnsureConnection ( )
181
+ . ConfigureAwait ( false ) ;
182
+ await base . OpenAsync ( )
183
+ . ConfigureAwait ( false ) ;
186
184
}
187
185
188
- private void EnsureConnection ( )
186
+ private async Task EnsureConnection ( )
189
187
{
188
+ // TODO: do this!
189
+ // await _semaphore.WaitAsync();
190
190
try
191
191
{
192
192
if ( _nativeConnection is { IsClosed : false } )
@@ -196,22 +196,53 @@ private void EnsureConnection()
196
196
197
197
var open = new Open
198
198
{
199
- HostName = $ "vhost:{ _connectionSettings . VirtualHost ( ) } ",
199
+ HostName = $ "vhost:{ _connectionSettings . VirtualHost } ",
200
200
Properties = new Fields ( )
201
201
{
202
- [ new Symbol ( "connection_name" ) ] = _connectionSettings . ConnectionName ( ) ,
202
+ [ new Symbol ( "connection_name" ) ] = _connectionSettings . ConnectionName ,
203
203
}
204
204
} ;
205
205
206
- var manualReset = new ManualResetEvent ( false ) ;
207
- _nativeConnection = new Connection ( _connectionSettings . Address , null , open , ( connection , open1 ) =>
206
+ void onOpened ( Amqp . IConnection connection , Open open1 )
208
207
{
209
- manualReset . Set ( ) ;
210
208
Trace . WriteLine ( TraceLevel . Verbose , $ "Connection opened. Info: { ToString ( ) } ") ;
211
209
OnNewStatus ( State . Open , null ) ;
212
- } ) ;
210
+ }
211
+
212
+ var cf = new ConnectionFactory ( ) ;
213
+
214
+ if ( _connectionSettings . UseSsl && _connectionSettings . TlsSettings is not null )
215
+ {
216
+ cf . SSL . Protocols = _connectionSettings . TlsSettings . Protocols ;
217
+ cf . SSL . CheckCertificateRevocation = _connectionSettings . TlsSettings . CheckCertificateRevocation ;
218
+
219
+ if ( _connectionSettings . TlsSettings . ClientCertificates . Count > 0 )
220
+ {
221
+ cf . SSL . ClientCertificates = _connectionSettings . TlsSettings . ClientCertificates ;
222
+ }
223
+
224
+ if ( _connectionSettings . TlsSettings . LocalCertificateSelectionCallback is not null )
225
+ {
226
+ cf . SSL . LocalCertificateSelectionCallback = _connectionSettings . TlsSettings . LocalCertificateSelectionCallback ;
227
+ }
228
+
229
+ if ( _connectionSettings . TlsSettings . RemoteCertificateValidationCallback is not null )
230
+ {
231
+ cf . SSL . RemoteCertificateValidationCallback = _connectionSettings . TlsSettings . RemoteCertificateValidationCallback ;
232
+ }
233
+ }
234
+
235
+ try
236
+ {
237
+ _nativeConnection = await cf . CreateAsync ( _connectionSettings . Address , open : open , onOpened : onOpened )
238
+ . ConfigureAwait ( false ) ;
239
+ }
240
+ catch ( Exception ex )
241
+ {
242
+ throw new ConnectionException (
243
+ $ "Connection failed. Info: { ToString ( ) } ", ex ) ;
244
+ }
213
245
214
- manualReset . WaitOne ( TimeSpan . FromSeconds ( 5 ) ) ;
215
246
if ( _nativeConnection . IsClosed )
216
247
{
217
248
throw new ConnectionException (
@@ -294,7 +325,8 @@ await Task.Run(async () =>
294
325
await Task . Delay ( TimeSpan . FromMilliseconds ( next ) )
295
326
. ConfigureAwait ( false ) ;
296
327
297
- EnsureConnection ( ) ;
328
+ await EnsureConnection ( )
329
+ . ConfigureAwait ( false ) ;
298
330
connected = true ;
299
331
}
300
332
catch ( Exception e )
0 commit comments