Skip to content

Commit df00074

Browse files
committed
* Fix reconnection due to the use of SendAsync in AmqpPublisher. So bizarre!
* Make all of `Utils` internal
1 parent f5ef509 commit df00074

File tree

8 files changed

+87
-280
lines changed

8 files changed

+87
-280
lines changed

RabbitMQ.AMQP.Client/IConnectionSettings.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
1515
string ContainerId { get; }
1616
string Path { get; }
1717
bool UseSsl { get; }
18-
int MaxFrameSize { get; }
18+
uint MaxFrameSize { get; }
1919
SaslMechanism SaslMechanism { get; }
2020
ITlsSettings? TlsSettings { get; }
2121
IRecoveryConfiguration Recovery { get; }

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 25 additions & 196 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@
55
using System.Collections.Concurrent;
66
using System.Collections.ObjectModel;
77
using Amqp;
8-
using Amqp.Handler;
8+
using Amqp.Framing;
99
using Amqp.Sasl;
10+
using Amqp.Types;
1011

1112
namespace RabbitMQ.AMQP.Client.Impl;
1213

1314
/// <summary>
1415
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
1516
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
1617
/// </summary>
17-
public class AmqpConnection : AbstractLifeCycle, IConnection, IHandler
18+
public class AmqpConnection : AbstractLifeCycle, IConnection
1819
{
1920
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
2021
private const string ConnectionNotRecoveredMessage = "Connection not recovered";
@@ -24,6 +25,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection, IHandler
2425

2526
// The native AMQP.Net Lite connection
2627
private Connection? _nativeConnection;
28+
private ClosedCallback? _closedCallback;
2729

2830
private readonly AmqpManagement _management;
2931
private readonly RecordingTopologyListener _recordingTopologyListener = new();
@@ -133,90 +135,6 @@ await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(1
133135
OnNewStatus(State.Closed, null);
134136
}
135137

136-
public bool CanHandle(EventId id)
137-
{
138-
Trace.WriteLine(TraceLevel.Verbose, $"EventId: {id}");
139-
return id switch
140-
{
141-
EventId.SocketAccept => false,
142-
EventId.SocketConnect => false,
143-
EventId.ConnectionLocalOpen => true,
144-
EventId.ConnectionRemoteOpen => false,
145-
EventId.ConnectionLocalClose => false,
146-
EventId.ConnectionRemoteClose => true,
147-
EventId.SessionLocalOpen => false,
148-
EventId.SessionRemoteOpen => false,
149-
EventId.SessionLocalClose => false,
150-
EventId.SessionRemoteClose => false,
151-
EventId.LinkLocalOpen => false,
152-
EventId.LinkRemoteOpen => false,
153-
EventId.LinkLocalClose => false,
154-
EventId.LinkRemoteClose => false,
155-
EventId.SendDelivery => false,
156-
EventId.ReceiveDelivery => false,
157-
EventId.SslAuthenticate => false,
158-
EventId.SslStreamAccept => false,
159-
EventId.HttpAccept => false,
160-
EventId.WebSocketAccept => false,
161-
EventId.ConnectionAccept => false,
162-
_ => false,
163-
};
164-
}
165-
166-
public void Handle(Event protocolEvent)
167-
{
168-
Trace.WriteLine(TraceLevel.Verbose, $"protocolEvent: {protocolEvent}");
169-
switch (protocolEvent.Id)
170-
{
171-
case EventId.ConnectionLocalOpen:
172-
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is open");
173-
OnNewStatus(State.Open, null);
174-
break;
175-
case EventId.ConnectionRemoteOpen:
176-
break;
177-
case EventId.ConnectionLocalClose:
178-
break;
179-
case EventId.ConnectionRemoteClose:
180-
// TODO oy vey, what to do about the task?
181-
_ = Task.Run(() => ReconnectAsync((Amqp.Framing.Close)protocolEvent.Context));
182-
break;
183-
case EventId.SessionLocalOpen:
184-
break;
185-
case EventId.SessionRemoteOpen:
186-
break;
187-
case EventId.SessionLocalClose:
188-
break;
189-
case EventId.SessionRemoteClose:
190-
break;
191-
case EventId.LinkLocalOpen:
192-
break;
193-
case EventId.LinkRemoteOpen:
194-
break;
195-
case EventId.LinkLocalClose:
196-
break;
197-
case EventId.LinkRemoteClose:
198-
break;
199-
case EventId.SendDelivery:
200-
break;
201-
case EventId.ReceiveDelivery:
202-
break;
203-
case EventId.SocketConnect:
204-
break;
205-
case EventId.SslAuthenticate:
206-
break;
207-
case EventId.SocketAccept:
208-
break;
209-
case EventId.SslStreamAccept:
210-
break;
211-
case EventId.HttpAccept:
212-
break;
213-
case EventId.WebSocketAccept:
214-
break;
215-
case EventId.ConnectionAccept:
216-
break;
217-
}
218-
}
219-
220138
public override string ToString()
221139
{
222140
string info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
@@ -232,6 +150,11 @@ protected override void Dispose(bool disposing)
232150
{
233151
if (disposing)
234152
{
153+
if (_nativeConnection is not null &&
154+
_closedCallback is not null)
155+
{
156+
_nativeConnection.Closed -= _closedCallback;
157+
}
235158
_semaphoreOpen.Dispose();
236159
_semaphoreClose.Dispose();
237160
}
@@ -283,33 +206,26 @@ await _semaphoreOpen.WaitAsync()
283206
return;
284207
}
285208

286-
/*
287209
var open = new Open
288210
{
211+
// Note: no need to set cf.AMQP.HostName
289212
HostName = $"vhost:{_connectionSettings.VirtualHost}",
213+
// Note: no need to set cf.AMQP.ContainerId
290214
ContainerId = _connectionSettings.ContainerId,
291215
Properties = new Fields()
292216
{
293217
[new Symbol("connection_name")] = _connectionSettings.ContainerId,
294218
}
295219
};
296220

297-
if (_connectionSettings.MaxFrameSize > int.MinValue)
221+
if (_connectionSettings.MaxFrameSize > uint.MinValue)
298222
{
299223
// Note: when set here, there is no need to set cf.AMQP.MaxFrameSize
300224
open.MaxFrameSize = _connectionSettings.MaxFrameSize;
301225
}
302-
*/
303226

304227
var cf = new ConnectionFactory();
305228

306-
cf.AMQP.ContainerId = _connectionSettings.ContainerId;
307-
cf.AMQP.HostName = $"vhost:{_connectionSettings.VirtualHost}";
308-
if (_connectionSettings.MaxFrameSize > int.MinValue)
309-
{
310-
cf.AMQP.MaxFrameSize = _connectionSettings.MaxFrameSize;
311-
}
312-
313229
if (_connectionSettings is { UseSsl: true, TlsSettings: not null })
314230
{
315231
cf.SSL.Protocols = _connectionSettings.TlsSettings.Protocols;
@@ -338,13 +254,11 @@ [new Symbol("connection_name")] = _connectionSettings.ContainerId,
338254
cf.SASL.Profile = SaslProfile.External;
339255
}
340256

341-
/*
342257
void OnOpened(Amqp.IConnection connection, Open open1)
343258
{
344259
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is open");
345260
OnNewStatus(State.Open, null);
346261
}
347-
*/
348262

349263
try
350264
{
@@ -358,8 +272,8 @@ void OnOpened(Amqp.IConnection connection, Open open1)
358272
{
359273
connectionSettings = (ConnectionSettings)_connectionSettings;
360274
Address address = connectionSettings.Address;
361-
_nativeConnection = await cf.CreateAsync(address: address, handler: this).ConfigureAwait(false);
362-
// _nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened).ConfigureAwait(false);
275+
_nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened)
276+
.ConfigureAwait(false);
363277
}
364278
}
365279
catch (Exception ex)
@@ -377,8 +291,8 @@ void OnOpened(Amqp.IConnection connection, Open open1)
377291
await _management.OpenAsync()
378292
.ConfigureAwait(false);
379293

380-
// _closedCallback = BuildClosedCallback();
381-
// _nativeConnection.AddClosedCallback(_closedCallback);
294+
_closedCallback = BuildClosedCallback();
295+
_nativeConnection.AddClosedCallback(_closedCallback);
382296
}
383297
catch (AmqpException e)
384298
{
@@ -391,98 +305,6 @@ await _management.OpenAsync()
391305
}
392306
}
393307

394-
private void DoClose(Amqp.Framing.Error errorFrame, Error? argError = null)
395-
{
396-
Error? err = argError ?? Utils.ConvertError(errorFrame);
397-
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
398-
OnNewStatus(State.Closed, err);
399-
ChangeEntitiesStatus(State.Closed, err);
400-
_connectionCloseTaskCompletionSource.SetResult(true);
401-
}
402-
403-
private async Task ReconnectAsync(Amqp.Framing.Close closeFrame)
404-
{
405-
Amqp.Framing.Error errorFrame = closeFrame.Error;
406-
407-
// change the status for the connection and all the entities
408-
// to reconnecting and all the events are fired
409-
OnNewStatus(State.Reconnecting, Utils.ConvertError(errorFrame));
410-
ChangeEntitiesStatus(State.Reconnecting, Utils.ConvertError(errorFrame));
411-
412-
IBackOffDelayPolicy backOffDelayPolicy = _connectionSettings.Recovery.GetBackOffDelayPolicy();
413-
bool connected = false;
414-
// as first step we try to recover the connection
415-
// so the connected flag is false
416-
while (false == connected &&
417-
// we have to check if the backoff policy is active
418-
// the user may want to disable the backoff policy or
419-
// the backoff policy is not active due of some condition
420-
// for example: Reaching the maximum number of retries and avoid the forever loop
421-
backOffDelayPolicy.IsActive() &&
422-
// even we set the status to reconnecting up, we need to check if the connection is still in the
423-
// reconnecting status. The user may close the connection in the meanwhile
424-
State == State.Reconnecting)
425-
{
426-
try
427-
{
428-
int nextDelayMs = backOffDelayPolicy.Delay();
429-
430-
Trace.WriteLine(TraceLevel.Information,
431-
$"{ToString()} is trying Recovering connection in {nextDelayMs} milliseconds, " +
432-
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. ");
433-
434-
await Task.Delay(TimeSpan.FromMilliseconds(nextDelayMs))
435-
.ConfigureAwait(false);
436-
437-
await OpenConnectionAsync()
438-
.ConfigureAwait(false);
439-
440-
connected = true;
441-
}
442-
catch (Exception e)
443-
{
444-
// TODO this could / should be more visible to the user, perhaps?
445-
Trace.WriteLine(TraceLevel.Warning,
446-
$"{ToString()} Error trying to recover connection {e}");
447-
}
448-
}
449-
450-
backOffDelayPolicy.Reset();
451-
string connectionDescription = connected ? "recovered" : "not recovered";
452-
Trace.WriteLine(TraceLevel.Information,
453-
$"{ToString()} is {connectionDescription}");
454-
455-
if (false == connected)
456-
{
457-
var notRecoveredError = new Error(ConnectionNotRecoveredCode,
458-
$"{ConnectionNotRecoveredMessage}," +
459-
$"recover status: {_connectionSettings.Recovery}");
460-
DoClose(errorFrame, notRecoveredError);
461-
return;
462-
}
463-
464-
if (_connectionSettings.Recovery.IsTopologyActive())
465-
{
466-
Trace.WriteLine(TraceLevel.Information, $"{ToString()} Recovering topology");
467-
var visitor = new Visitor(_management);
468-
await _recordingTopologyListener.Accept(visitor)
469-
.ConfigureAwait(false);
470-
}
471-
472-
OnNewStatus(State.Open, null);
473-
474-
// after the connection is recovered we have to reconnect all the publishers and consumers
475-
try
476-
{
477-
await ReconnectEntitiesAsync()
478-
.ConfigureAwait(false);
479-
}
480-
catch (Exception e)
481-
{
482-
Trace.WriteLine(TraceLevel.Error, $"{ToString()} error trying to reconnect entities {e}");
483-
}
484-
}
485-
486308
/// <summary>
487309
/// Event handler for the connection closed event.
488310
/// In case the error is null means that the connection is closed by the user.
@@ -494,7 +316,6 @@ await ReconnectEntitiesAsync()
494316
/// and then kick off a task dedicated to recovery
495317
/// </summary>
496318
/// <returns></returns>
497-
/*
498319
private ClosedCallback BuildClosedCallback()
499320
{
500321
return async (sender, error) =>
@@ -511,6 +332,15 @@ await _semaphoreClose.WaitAsync()
511332
// close all the sessions, if the connection is closed the sessions are not valid anymore
512333
_nativePubSubSessions.ClearSessions();
513334

335+
void DoClose(Error? argError = null)
336+
{
337+
Error? err = argError ?? Utils.ConvertError(error);
338+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
339+
OnNewStatus(State.Closed, err);
340+
ChangeEntitiesStatus(State.Closed, err);
341+
_connectionCloseTaskCompletionSource.SetResult(true);
342+
}
343+
514344
if (error is null)
515345
{
516346
DoClose();
@@ -631,7 +461,6 @@ await ReconnectEntitiesAsync()
631461
}
632462
};
633463
}
634-
*/
635464

636465
private void ChangeEntitiesStatus(State state, Error? error)
637466
{

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ private enum PauseStatus
2323
private readonly MessageHandler _messageHandler;
2424
private readonly int _initialCredits;
2525
private readonly Map _filters;
26+
private readonly Guid _id = Guid.NewGuid();
2627

27-
private Guid _id = Guid.NewGuid();
2828
private ReceiverLink? _receiverLink;
2929

3030
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
@@ -47,25 +47,11 @@ public AmqpConsumer(AmqpConnection connection, string address,
4747

4848
public override async Task OpenAsync()
4949
{
50-
if (_receiverLink is not null)
51-
{
52-
if (false == _connection.Consumers.TryRemove(_id, out _))
53-
{
54-
// TODO error?
55-
}
56-
57-
_id = Guid.NewGuid();
58-
59-
if (false == _connection.Consumers.TryAdd(_id, this))
60-
{
61-
// TODO error?
62-
}
63-
}
6450
try
6551
{
6652
TaskCompletionSource<ReceiverLink> attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
6753

68-
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id.ToString(), _filters);
54+
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id, _filters);
6955

7056
void onAttached(ILink argLink, Attach argAttach)
7157
{

0 commit comments

Comments
 (0)