Skip to content

Commit 4364927

Browse files
Gsantomaggiolukebakken
authored andcommitted
Consumer handler messages async
- delivery context async - refactor logs Signed-off-by: Gabriele Santomaggio <[email protected]> * Rename `ConnectionName` to `ContainerId` to match AMQP 1.0 spec * Fix `PauseShouldStopMessageArrivalUnpauseShouldResumeIt` test and add some more assertions to it. * Get the credits right for consumers. * Remove some unnecessary output * Adjust tests to use `queueName` variable
1 parent 89b5478 commit 4364927

23 files changed

+391
-286
lines changed

.ci/ubuntu/rabbitmq.conf

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ loopback_users = none
33
log.console = true
44
log.console.level = debug
55
log.file = /var/log/rabbitmq/rabbitmq.log
6-
log.file.level = info
7-
# log.connection.level = warning
8-
# log.channel.level = warning
6+
log.file.level = debug
97
log.exchange = false
108

119
listeners.tcp.default = 5672

RabbitMQ.AMQP.Client/IConnectionSettings.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,13 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
1212
string? User { get; }
1313
string? Password { get; }
1414
string Scheme { get; }
15-
string ConnectionName { get; }
15+
string ContainerId { get; }
1616
string Path { get; }
1717
bool UseSsl { get; }
1818
uint MaxFrameSize { get; }
1919
SaslMechanism SaslMechanism { get; }
2020
ITlsSettings? TlsSettings { get; }
21-
2221
IRecoveryConfiguration Recovery { get; }
23-
2422
}
2523

2624
/// <summary>

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,18 @@
1-
using Amqp.Listener;
2-
31
namespace RabbitMQ.AMQP.Client;
42

53
public class ConsumerException(string message) : Exception(message);
6-
public delegate void MessageHandler(IContext context, IMessage message);
4+
public delegate Task MessageHandler(IContext context, IMessage message);
75

86
public interface IConsumer : ILifeCycle
97
{
108
void Pause();
11-
129
void Unpause();
13-
1410
long UnsettledMessageCount { get; }
1511
}
1612

17-
public interface IMessageHandler
18-
{
19-
void Handle(Context context, IMessage message);
20-
}
21-
2213
public interface IContext
2314
{
24-
void Accept();
25-
26-
void Discard();
27-
28-
void Requeue();
15+
Task AcceptAsync();
16+
Task DiscardAsync();
17+
Task RequeueAsync();
2918
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,10 @@ await _semaphoreOpen.WaitAsync()
204204
var open = new Open
205205
{
206206
HostName = $"vhost:{_connectionSettings.VirtualHost}",
207+
ContainerId = _connectionSettings.ContainerId,
207208
Properties = new Fields()
208209
{
209-
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
210+
[new Symbol("connection_name")] = _connectionSettings.ContainerId,
210211
}
211212
};
212213

@@ -216,15 +217,15 @@ [new Symbol("connection_name")] = _connectionSettings.ConnectionName,
216217
open.MaxFrameSize = _connectionSettings.MaxFrameSize;
217218
}
218219

219-
void onOpened(Amqp.IConnection connection, Open open1)
220+
void OnOpened(Amqp.IConnection connection, Open open1)
220221
{
221-
Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}");
222+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is open");
222223
OnNewStatus(State.Open, null);
223224
}
224225

225226
var cf = new ConnectionFactory();
226227

227-
if (_connectionSettings.UseSsl && _connectionSettings.TlsSettings is not null)
228+
if (_connectionSettings is { UseSsl: true, TlsSettings: not null })
228229
{
229230
cf.SSL.Protocols = _connectionSettings.TlsSettings.Protocols;
230231
cf.SSL.CheckCertificateRevocation = _connectionSettings.TlsSettings.CheckCertificateRevocation;
@@ -254,19 +255,19 @@ void onOpened(Amqp.IConnection connection, Open open1)
254255

255256
try
256257
{
257-
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: onOpened)
258+
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: OnOpened)
258259
.ConfigureAwait(false);
259260
}
260261
catch (Exception ex)
261262
{
262263
throw new ConnectionException(
263-
$"Connection failed. Info: {ToString()}", ex);
264+
$"{ToString()} connection failed.", ex);
264265
}
265266

266267
if (_nativeConnection.IsClosed)
267268
{
268269
throw new ConnectionException(
269-
$"Connection failed. Info: {ToString()}, error: {_nativeConnection.Error}");
270+
$"{ToString()} connection failed., error: {_nativeConnection.Error}");
270271
}
271272

272273
await _management.OpenAsync()
@@ -276,8 +277,8 @@ await _management.OpenAsync()
276277
}
277278
catch (AmqpException e)
278279
{
279-
Trace.WriteLine(TraceLevel.Error, $"Error trying to connect. Info: {ToString()}, error: {e}");
280-
throw new ConnectionException($"Error trying to connect. Info: {ToString()}, error: {e}");
280+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} - Error trying to connect, error: {e}");
281+
throw new ConnectionException($"{ToString()} - Error trying to connect., error: {e}");
281282
}
282283
finally
283284
{
@@ -310,8 +311,8 @@ await _semaphoreClose.WaitAsync()
310311
if (error != null)
311312
{
312313
// we assume here that the connection is closed unexpectedly, since the error is not null
313-
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpectedly. " +
314-
$"Info: {ToString()}");
314+
Trace.WriteLine(TraceLevel.Warning, $"{ToString()} is closed unexpectedly. "
315+
);
315316

316317
// we have to check if the recovery is active.
317318
// The user may want to disable the recovery mechanism
@@ -349,9 +350,8 @@ await Task.Run(async () =>
349350
int nextDelayMs = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();
350351

351352
Trace.WriteLine(TraceLevel.Information,
352-
$"Trying Recovering connection in {nextDelayMs} milliseconds, " +
353-
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. " +
354-
$"Info: {ToString()})");
353+
$"{ToString()} is trying Recovering connection in {nextDelayMs} milliseconds, " +
354+
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. ");
355355

356356
await Task.Delay(TimeSpan.FromMilliseconds(nextDelayMs))
357357
.ConfigureAwait(false);
@@ -364,18 +364,18 @@ await OpenConnectionAsync()
364364
catch (Exception e)
365365
{
366366
Trace.WriteLine(TraceLevel.Warning,
367-
$"Error trying to recover connection {e}. Info: {this}");
367+
$"{ToString()} Error trying to recover connection {e}");
368368
}
369369
}
370370

371371
_connectionSettings.Recovery.GetBackOffDelayPolicy().Reset();
372372
string connectionDescription = connected ? "recovered" : "not recovered";
373373
Trace.WriteLine(TraceLevel.Information,
374-
$"Connection {connectionDescription}. Info: {ToString()}");
374+
$"{ToString()} is {connectionDescription}");
375375

376376
if (!connected)
377377
{
378-
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
378+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} connection is closed");
379379
OnNewStatus(State.Closed,
380380
new Error(ConnectionNotRecoveredCode,
381381
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
@@ -388,7 +388,7 @@ await OpenConnectionAsync()
388388

389389
if (_connectionSettings.Recovery.IsTopologyActive())
390390
{
391-
Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
391+
Trace.WriteLine(TraceLevel.Information, $"{ToString()} Recovering topology");
392392
var visitor = new Visitor(_management);
393393
await _recordingTopologyListener.Accept(visitor)
394394
.ConfigureAwait(false);
@@ -403,15 +403,14 @@ await _recordingTopologyListener.Accept(visitor)
403403
}
404404
catch (Exception e)
405405
{
406-
Trace.WriteLine(TraceLevel.Error, $"Error trying to reconnect entities {e}. Info: {this}");
406+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} error trying to reconnect entities {e}");
407407
}
408408
}).ConfigureAwait(false);
409409

410410
return;
411411
}
412412

413-
414-
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
413+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
415414
OnNewStatus(State.Closed, Utils.ConvertError(error));
416415
}
417416
finally

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,29 +42,43 @@ public override async Task OpenAsync()
4242
{
4343
try
4444
{
45-
TaskCompletionSource attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
45+
TaskCompletionSource<ReceiverLink> attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
4646

4747
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, Id, _filters);
4848

4949
void onAttached(ILink argLink, Attach argAttach)
5050
{
51-
attachCompletedTcs.SetResult();
51+
if (argLink is ReceiverLink link)
52+
{
53+
attachCompletedTcs.SetResult(link);
54+
}
55+
else
56+
{
57+
// TODO create "internal bug" exception type?
58+
var ex = new InvalidOperationException(
59+
"invalid link in onAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
60+
attachCompletedTcs.SetException(ex);
61+
}
5262
}
5363

64+
ReceiverLink? tmpReceiverLink = null;
5465
Task receiverLinkTask = Task.Run(() =>
5566
{
56-
_receiverLink = new ReceiverLink(_connection._nativePubSubSessions.GetOrCreateSession(), Id, attach, onAttached);
67+
tmpReceiverLink = new ReceiverLink(_connection._nativePubSubSessions.GetOrCreateSession(), Id, attach, onAttached);
5768
});
5869

5970
// TODO configurable timeout
6071
TimeSpan waitSpan = TimeSpan.FromSeconds(5);
6172

62-
await attachCompletedTcs.Task.WaitAsync(waitSpan)
73+
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
6374
.ConfigureAwait(false);
6475

6576
await receiverLinkTask.WaitAsync(waitSpan)
6677
.ConfigureAwait(false);
6778

79+
System.Diagnostics.Debug.Assert(tmpReceiverLink != null);
80+
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_receiverLink, tmpReceiverLink));
81+
6882
if (_receiverLink is null)
6983
{
7084
throw new ConsumerException($"{ToString()} Failed to create receiver link (null was returned)");
@@ -77,9 +91,10 @@ await receiverLinkTask.WaitAsync(waitSpan)
7791
}
7892
else
7993
{
80-
// TODO: Check the performance during the download messages
81-
// The publisher is faster than the consumer
82-
_receiverLink.Start(_initialCredits, OnReceiverLinkMessage);
94+
_receiverLink.SetCredit(_initialCredits);
95+
96+
// TODO save / cancel task
97+
_ = Task.Run(ProcessMessages);
8398

8499
await base.OpenAsync()
85100
.ConfigureAwait(false);
@@ -91,11 +106,51 @@ await base.OpenAsync()
91106
}
92107
}
93108

94-
private void OnReceiverLinkMessage(IReceiverLink link, Message message)
109+
private async Task ProcessMessages()
95110
{
96-
_unsettledMessageCounter.Increment();
97-
IContext context = new DeliveryContext(link, message, _unsettledMessageCounter);
98-
_messageHandler(context, new AmqpMessage(message));
111+
try
112+
{
113+
if (_receiverLink is null)
114+
{
115+
// TODO is this a serious error?
116+
return;
117+
}
118+
119+
while (_receiverLink is { LinkState: LinkState.Attached })
120+
{
121+
TimeSpan timeout = TimeSpan.FromSeconds(60); // TODO configurable
122+
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
123+
if (nativeMessage is null)
124+
{
125+
// this is not a problem, it is just a timeout.
126+
// the timeout is set to 60 seconds.
127+
// For the moment I'd trace it at some point we can remove it
128+
Trace.WriteLine(TraceLevel.Verbose,
129+
$"{ToString()}: Timeout {timeout.Seconds} s.. waiting for message.");
130+
continue;
131+
}
132+
133+
_unsettledMessageCounter.Increment();
134+
135+
IContext context = new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter);
136+
var amqpMessage = new AmqpMessage(nativeMessage);
137+
138+
// TODO catch exceptions thrown by handlers,
139+
// then call exception handler?
140+
await _messageHandler(context, amqpMessage).ConfigureAwait(false);
141+
}
142+
}
143+
catch (Exception e)
144+
{
145+
if (State == State.Closing)
146+
{
147+
return;
148+
}
149+
150+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to process messages, {e}");
151+
}
152+
153+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed.");
99154
}
100155

101156
private string Id { get; } = Guid.NewGuid().ToString();
@@ -111,7 +166,7 @@ public void Pause()
111166
if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
112167
(int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
113168
{
114-
_receiverLink.SetCredit(credit: 0, autoRestore: false);
169+
_receiverLink.SetCredit(credit: 0);
115170

116171
if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
117172
(int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
@@ -163,8 +218,8 @@ public override async Task CloseAsync()
163218

164219
OnNewStatus(State.Closing, null);
165220

166-
// TODO timeout
167-
await _receiverLink.CloseAsync()
221+
// TODO global timeout for closing, other async actions?
222+
await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
168223
.ConfigureAwait(false);
169224

170225
_receiverLink = null;
@@ -176,6 +231,9 @@ await _receiverLink.CloseAsync()
176231

177232
public override string ToString()
178233
{
179-
return $"Consumer{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
234+
return $"Consumer{{Address='{_address}', " +
235+
$"id={Id}, " +
236+
$"Connection='{_connection}', " +
237+
$"State='{State}'}}";
180238
}
181239
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public IConsumerBuilder Queue(string queue)
1616
}
1717

1818

19-
private MessageHandler _handler = (message, context) => { };
19+
private MessageHandler? _handler;
2020

2121
public IConsumerBuilder MessageHandler(MessageHandler handler)
2222
{
@@ -38,6 +38,11 @@ public IConsumerBuilder.IStreamOptions Stream()
3838

3939
public async Task<IConsumer> BuildAsync(CancellationToken cancellationToken = default)
4040
{
41+
if (_handler is null)
42+
{
43+
throw new ConsumerException("Message handler is not set");
44+
}
45+
4146
string address = new AddressBuilder().Queue(_queue).Address();
4247

4348
AmqpConsumer consumer = new(connection, address, _handler, _initialCredits, _filters);

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ await EnsureReceiverLinkAsync()
124124

125125
_managementSession.Closed += (sender, error) =>
126126
{
127-
if (State != State.Closed)
127+
if (State != State.Closed && error != null)
128128
{
129129
Trace.WriteLine(TraceLevel.Warning, $"Management session closed " +
130130
$"with error: {Utils.ConvertError(error)} " +
@@ -151,11 +151,15 @@ private async Task ProcessResponses()
151151
continue;
152152
}
153153

154-
using (Message msg = await _receiverLink.ReceiveAsync().ConfigureAwait(false))
154+
TimeSpan timeout = TimeSpan.FromSeconds(59);
155+
using (Message msg = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false))
155156
{
156157
if (msg == null)
157158
{
158-
Trace.WriteLine(TraceLevel.Warning, "Received null message");
159+
// this is not a problem, it is just a timeout.
160+
// the timeout is set to 60 seconds.
161+
// For the moment I'd trace it at some point we can remove it
162+
Trace.WriteLine(TraceLevel.Verbose, $"Management:Timeout {timeout.Seconds} s.. waiting for message.");
159163
continue;
160164
}
161165

0 commit comments

Comments
 (0)