Skip to content

Commit d9681d5

Browse files
Gsantomaggiolukebakken
authored andcommitted
Consumer handler messages async
- delivery context async - refactor logs Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 89b5478 commit d9681d5

File tree

14 files changed

+157
-102
lines changed

14 files changed

+157
-102
lines changed

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace RabbitMQ.AMQP.Client;
44

55
public class ConsumerException(string message) : Exception(message);
6-
public delegate void MessageHandler(IContext context, IMessage message);
6+
public delegate Task MessageHandler(IContext context, IMessage message);
77

88
public interface IConsumer : ILifeCycle
99
{
@@ -21,9 +21,9 @@ public interface IMessageHandler
2121

2222
public interface IContext
2323
{
24-
void Accept();
24+
Task Accept();
2525

26-
void Discard();
26+
Task Discard();
2727

28-
void Requeue();
28+
Task Requeue();
2929
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -216,15 +216,15 @@ [new Symbol("connection_name")] = _connectionSettings.ConnectionName,
216216
open.MaxFrameSize = _connectionSettings.MaxFrameSize;
217217
}
218218

219-
void onOpened(Amqp.IConnection connection, Open open1)
219+
void OnOpened(Amqp.IConnection connection, Open open1)
220220
{
221-
Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}");
221+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is open");
222222
OnNewStatus(State.Open, null);
223223
}
224224

225225
var cf = new ConnectionFactory();
226226

227-
if (_connectionSettings.UseSsl && _connectionSettings.TlsSettings is not null)
227+
if (_connectionSettings is { UseSsl: true, TlsSettings: not null })
228228
{
229229
cf.SSL.Protocols = _connectionSettings.TlsSettings.Protocols;
230230
cf.SSL.CheckCertificateRevocation = _connectionSettings.TlsSettings.CheckCertificateRevocation;
@@ -254,19 +254,19 @@ void onOpened(Amqp.IConnection connection, Open open1)
254254

255255
try
256256
{
257-
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: onOpened)
257+
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: OnOpened)
258258
.ConfigureAwait(false);
259259
}
260260
catch (Exception ex)
261261
{
262262
throw new ConnectionException(
263-
$"Connection failed. Info: {ToString()}", ex);
263+
$"{ToString()} connection failed.", ex);
264264
}
265265

266266
if (_nativeConnection.IsClosed)
267267
{
268268
throw new ConnectionException(
269-
$"Connection failed. Info: {ToString()}, error: {_nativeConnection.Error}");
269+
$"{ToString()} connection failed., error: {_nativeConnection.Error}");
270270
}
271271

272272
await _management.OpenAsync()
@@ -276,8 +276,8 @@ await _management.OpenAsync()
276276
}
277277
catch (AmqpException e)
278278
{
279-
Trace.WriteLine(TraceLevel.Error, $"Error trying to connect. Info: {ToString()}, error: {e}");
280-
throw new ConnectionException($"Error trying to connect. Info: {ToString()}, error: {e}");
279+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} - Error trying to connect, error: {e}");
280+
throw new ConnectionException($"{ToString()} - Error trying to connect., error: {e}");
281281
}
282282
finally
283283
{
@@ -310,8 +310,8 @@ await _semaphoreClose.WaitAsync()
310310
if (error != null)
311311
{
312312
// we assume here that the connection is closed unexpectedly, since the error is not null
313-
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpectedly. " +
314-
$"Info: {ToString()}");
313+
Trace.WriteLine(TraceLevel.Warning, $"{ToString()} is closed unexpectedly. "
314+
);
315315

316316
// we have to check if the recovery is active.
317317
// The user may want to disable the recovery mechanism
@@ -349,9 +349,8 @@ await Task.Run(async () =>
349349
int nextDelayMs = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();
350350

351351
Trace.WriteLine(TraceLevel.Information,
352-
$"Trying Recovering connection in {nextDelayMs} milliseconds, " +
353-
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. " +
354-
$"Info: {ToString()})");
352+
$"{ToString()} is trying Recovering connection in {nextDelayMs} milliseconds, " +
353+
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. ");
355354

356355
await Task.Delay(TimeSpan.FromMilliseconds(nextDelayMs))
357356
.ConfigureAwait(false);
@@ -364,18 +363,18 @@ await OpenConnectionAsync()
364363
catch (Exception e)
365364
{
366365
Trace.WriteLine(TraceLevel.Warning,
367-
$"Error trying to recover connection {e}. Info: {this}");
366+
$"{ToString()} Error trying to recover connection {e}");
368367
}
369368
}
370369

371370
_connectionSettings.Recovery.GetBackOffDelayPolicy().Reset();
372371
string connectionDescription = connected ? "recovered" : "not recovered";
373372
Trace.WriteLine(TraceLevel.Information,
374-
$"Connection {connectionDescription}. Info: {ToString()}");
373+
$"{ToString()} is {connectionDescription}");
375374

376375
if (!connected)
377376
{
378-
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
377+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} connection is closed");
379378
OnNewStatus(State.Closed,
380379
new Error(ConnectionNotRecoveredCode,
381380
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
@@ -388,7 +387,7 @@ await OpenConnectionAsync()
388387

389388
if (_connectionSettings.Recovery.IsTopologyActive())
390389
{
391-
Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
390+
Trace.WriteLine(TraceLevel.Information, $"{ToString()} Recovering topology");
392391
var visitor = new Visitor(_management);
393392
await _recordingTopologyListener.Accept(visitor)
394393
.ConfigureAwait(false);
@@ -403,15 +402,15 @@ await _recordingTopologyListener.Accept(visitor)
403402
}
404403
catch (Exception e)
405404
{
406-
Trace.WriteLine(TraceLevel.Error, $"Error trying to reconnect entities {e}. Info: {this}");
405+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} error trying to reconnect entities {e}");
407406
}
408407
}).ConfigureAwait(false);
409408

410409
return;
411410
}
412411

413412

414-
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
413+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
415414
OnNewStatus(State.Closed, Utils.ConvertError(error));
416415
}
417416
finally

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,25 +77,60 @@ await receiverLinkTask.WaitAsync(waitSpan)
7777
}
7878
else
7979
{
80-
// TODO: Check the performance during the download messages
81-
// The publisher is faster than the consumer
82-
_receiverLink.Start(_initialCredits, OnReceiverLinkMessage);
80+
// TODO save / cancel task
81+
_ = Task.Run(ProcessMessages);
8382

8483
await base.OpenAsync()
8584
.ConfigureAwait(false);
8685
}
86+
8787
}
8888
catch (Exception e)
8989
{
9090
throw new ConsumerException($"{ToString()} Failed to create receiver link, {e}");
9191
}
9292
}
9393

94-
private void OnReceiverLinkMessage(IReceiverLink link, Message message)
94+
private async Task ProcessMessages()
9595
{
96-
_unsettledMessageCounter.Increment();
97-
IContext context = new DeliveryContext(link, message, _unsettledMessageCounter);
98-
_messageHandler(context, new AmqpMessage(message));
96+
try
97+
{
98+
if (_receiverLink == null)
99+
{
100+
return;
101+
}
102+
103+
_receiverLink.SetCredit(_initialCredits);
104+
while (_receiverLink is { LinkState: LinkState.Attached })
105+
{
106+
TimeSpan timeout = TimeSpan.FromSeconds(59);
107+
Message? message = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
108+
if (message == null)
109+
{
110+
// this is not a problem, it is just a timeout.
111+
// the timeout is set to 60 seconds.
112+
// For the moment I'd trace it at some point we can remove it
113+
Trace.WriteLine(TraceLevel.Verbose,
114+
$"{ToString()}: Timeout {timeout.Seconds} s.. waiting for message.");
115+
continue;
116+
}
117+
118+
IContext context = new DeliveryContext(_receiverLink, message, _unsettledMessageCounter);
119+
await _messageHandler(context,
120+
new AmqpMessage(message)).ConfigureAwait(false);
121+
}
122+
}
123+
catch (Exception e)
124+
{
125+
if (State == State.Closing)
126+
{
127+
return;
128+
}
129+
130+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to process messages, {e}");
131+
}
132+
133+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed.");
99134
}
100135

101136
private string Id { get; } = Guid.NewGuid().ToString();
@@ -176,6 +211,9 @@ await _receiverLink.CloseAsync()
176211

177212
public override string ToString()
178213
{
179-
return $"Consumer{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
214+
return $"Consumer{{Address='{_address}', " +
215+
$"id={Id}, " +
216+
$"ConnectionName='{_connection}', " +
217+
$"State='{State}'}}";
180218
}
181219
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public IConsumerBuilder Queue(string queue)
1616
}
1717

1818

19-
private MessageHandler _handler = (message, context) => { };
19+
private MessageHandler? _handler;
2020

2121
public IConsumerBuilder MessageHandler(MessageHandler handler)
2222
{
@@ -38,6 +38,11 @@ public IConsumerBuilder.IStreamOptions Stream()
3838

3939
public async Task<IConsumer> BuildAsync(CancellationToken cancellationToken = default)
4040
{
41+
if (_handler is null)
42+
{
43+
throw new ConsumerException("Message handler is not set");
44+
}
45+
4146
string address = new AddressBuilder().Queue(_queue).Address();
4247

4348
AmqpConsumer consumer = new(connection, address, _handler, _initialCredits, _filters);

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ await EnsureReceiverLinkAsync()
124124

125125
_managementSession.Closed += (sender, error) =>
126126
{
127-
if (State != State.Closed)
127+
if (State != State.Closed && error != null)
128128
{
129129
Trace.WriteLine(TraceLevel.Warning, $"Management session closed " +
130130
$"with error: {Utils.ConvertError(error)} " +
@@ -151,11 +151,15 @@ private async Task ProcessResponses()
151151
continue;
152152
}
153153

154-
using (Message msg = await _receiverLink.ReceiveAsync().ConfigureAwait(false))
154+
TimeSpan timeout = TimeSpan.FromSeconds(59);
155+
using (Message msg = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false))
155156
{
156157
if (msg == null)
157158
{
158-
Trace.WriteLine(TraceLevel.Warning, "Received null message");
159+
// this is not a problem, it is just a timeout.
160+
// the timeout is set to 60 seconds.
161+
// For the moment I'd trace it at some point we can remove it
162+
Trace.WriteLine(TraceLevel.Verbose, $"Management:Timeout {timeout.Seconds} s.. waiting for message.");
159163
continue;
160164
}
161165

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@ namespace RabbitMQ.AMQP.Client.Impl;
88

99
public class DeliveryContext(IReceiverLink link, Message message, UnsettledMessageCounter unsettledMessageCounter) : IContext
1010
{
11-
public void Accept()
11+
public Task Accept()
1212
{
1313
if (link.IsClosed)
1414
{
1515
throw new ConsumerException("Link is closed");
1616
}
17-
1817
link.Accept(message);
1918
unsettledMessageCounter.Decrement();
19+
message.Dispose();
20+
return Task.CompletedTask;
2021
}
2122

22-
public void Discard()
23+
public Task Discard()
2324
{
2425
if (link.IsClosed)
2526
{
@@ -28,16 +29,20 @@ public void Discard()
2829

2930
link.Reject(message);
3031
unsettledMessageCounter.Decrement();
32+
message.Dispose();
33+
return Task.CompletedTask;
3134
}
3235

33-
public void Requeue()
36+
public Task Requeue()
3437
{
35-
if (!link.IsClosed)
38+
if (link.IsClosed)
3639
{
3740
throw new ConsumerException("Link is closed");
3841
}
3942

4043
link.Release(message);
4144
unsettledMessageCounter.Decrement();
45+
message.Dispose();
46+
return Task.CompletedTask;
4247
}
4348
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.Messag
111111
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(string! queue) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
112112
RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
113113
RabbitMQ.AMQP.Client.IContext
114-
RabbitMQ.AMQP.Client.IContext.Accept() -> void
115-
RabbitMQ.AMQP.Client.IContext.Discard() -> void
116-
RabbitMQ.AMQP.Client.IContext.Requeue() -> void
114+
RabbitMQ.AMQP.Client.IContext.Accept() -> System.Threading.Tasks.Task!
115+
RabbitMQ.AMQP.Client.IContext.Discard() -> System.Threading.Tasks.Task!
116+
RabbitMQ.AMQP.Client.IContext.Requeue() -> System.Threading.Tasks.Task!
117117
RabbitMQ.AMQP.Client.IEntityDeclaration
118118
RabbitMQ.AMQP.Client.IEntityDeclaration.Declare() -> System.Threading.Tasks.Task!
119119
RabbitMQ.AMQP.Client.IEntityInfo
@@ -385,10 +385,10 @@ RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.Offset(long offset) -> RabbitMQ.A
385385
RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
386386
RabbitMQ.AMQP.Client.Impl.DefaultStreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
387387
RabbitMQ.AMQP.Client.Impl.DeliveryContext
388-
RabbitMQ.AMQP.Client.Impl.DeliveryContext.Accept() -> void
388+
RabbitMQ.AMQP.Client.Impl.DeliveryContext.Accept() -> System.Threading.Tasks.Task!
389389
RabbitMQ.AMQP.Client.Impl.DeliveryContext.DeliveryContext(Amqp.IReceiverLink! link, Amqp.Message! message, RabbitMQ.AMQP.Client.Impl.UnsettledMessageCounter! unsettledMessageCounter) -> void
390-
RabbitMQ.AMQP.Client.Impl.DeliveryContext.Discard() -> void
391-
RabbitMQ.AMQP.Client.Impl.DeliveryContext.Requeue() -> void
390+
RabbitMQ.AMQP.Client.Impl.DeliveryContext.Discard() -> System.Threading.Tasks.Task!
391+
RabbitMQ.AMQP.Client.Impl.DeliveryContext.Requeue() -> System.Threading.Tasks.Task!
392392
RabbitMQ.AMQP.Client.Impl.DeliveryMode
393393
RabbitMQ.AMQP.Client.Impl.DeliveryMode.AtLeastOnce = 1 -> RabbitMQ.AMQP.Client.Impl.DeliveryMode
394394
RabbitMQ.AMQP.Client.Impl.DeliveryMode.AtMostOnce = 0 -> RabbitMQ.AMQP.Client.Impl.DeliveryMode

Tests/AmqpTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ public async Task QueueDeclareDeletePublishConsume(string subject)
133133
long receivedMessageCount = 0;
134134
var allMessagesReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
135135
string? receivedSubject = null;
136-
void MessageHandler(IContext ctx, IMessage msg)
136+
async Task MessageHandler(IContext ctx, IMessage msg)
137137
{
138138
receivedSubject = msg.Subject();
139-
ctx.Accept();
139+
await ctx.Accept();
140140
if (Interlocked.Increment(ref receivedMessageCount) == messageCount)
141141
{
142142
allMessagesReceivedTcs.SetResult();
@@ -215,9 +215,9 @@ public async Task BindingTest(string prefix, bool addBindingArgments)
215215
const int expectedMessageCount = 2;
216216
long receivedMessageCount = 0;
217217
var allMessagesReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
218-
void MessageHandler(IContext ctx, IMessage msg)
218+
async Task MessageHandler(IContext ctx, IMessage msg)
219219
{
220-
ctx.Accept();
220+
await ctx.Accept();
221221
if (Interlocked.Increment(ref receivedMessageCount) == expectedMessageCount)
222222
{
223223
allMessagesReceivedTcs.SetResult();

0 commit comments

Comments
 (0)