Skip to content

Commit 53ec279

Browse files
committed
Merge pull request #42 from rabbitmq/rabbitmq-amqp-dotnet-client-37-followup
Implement `UnsettledMessageCount` in `IConsumer`
1 parent 0da227f commit 53ec279

19 files changed

+622
-223
lines changed

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ public interface IConsumer : ILifeCycle
99
{
1010
void Pause();
1111

12-
long UnsettledMessageCount();
13-
1412
void Unpause();
13+
14+
long UnsettledMessageCount { get; }
1515
}
1616

1717
public interface IMessageHandler

RabbitMQ.AMQP.Client/IPublisher.cs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,39 @@ public enum OutcomeState
88
Failed,
99
}
1010

11-
public class OutcomeDescriptor(ulong code, string description, OutcomeState state, Error? error)
11+
public class PublishOutcome
1212
{
13-
public OutcomeState State { get; internal set; } = state;
14-
public ulong Code { get; internal set; } = code;
15-
public string Description { get; internal set; } = description;
13+
private readonly OutcomeState _state;
14+
private readonly Error? _error;
1615

17-
public Error? Error { get; internal set; } = error;
16+
public PublishOutcome(OutcomeState state, Error? error)
17+
{
18+
_state = state;
19+
_error = error;
20+
}
21+
22+
public OutcomeState State => _state;
23+
public Error? Error => _error;
1824
}
1925

20-
public delegate void OutcomeDescriptorCallback(IMessage message, OutcomeDescriptor outcomeDescriptor);
26+
// public delegate void OutcomeDescriptorCallback(IMessage message, PublishOutcome publishOutcome);
27+
28+
public class PublishResult
29+
{
30+
private IMessage _message;
31+
private PublishOutcome _outcome;
32+
33+
public PublishResult(IMessage message, PublishOutcome outcome)
34+
{
35+
_message = message;
36+
_outcome = outcome;
37+
}
38+
39+
public IMessage Message => _message;
40+
public PublishOutcome Outcome => _outcome;
41+
}
2142

2243
public interface IPublisher : ILifeCycle
2344
{
24-
// TODO this should be named PublishAsync
25-
Task Publish(IMessage message,
26-
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack
45+
Task<PublishResult> PublishAsync(IMessage message, CancellationToken cancellationToken = default);
2746
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
15
using System.Collections.Concurrent;
26
using System.Collections.ObjectModel;
37
using Amqp;
@@ -343,18 +347,19 @@ await Task.Run(async () =>
343347
{
344348
try
345349
{
346-
int next = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();
350+
int nextDelayMs = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();
347351

348352
Trace.WriteLine(TraceLevel.Information,
349-
$"Trying Recovering connection in {next} milliseconds, " +
353+
$"Trying Recovering connection in {nextDelayMs} milliseconds, " +
350354
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. " +
351355
$"Info: {ToString()})");
352356

353-
await Task.Delay(TimeSpan.FromMilliseconds(next))
357+
await Task.Delay(TimeSpan.FromMilliseconds(nextDelayMs))
354358
.ConfigureAwait(false);
355359

356360
await OpenConnectionAsync()
357361
.ConfigureAwait(false);
362+
358363
connected = true;
359364
}
360365
catch (Exception e)

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
15
using System.Runtime.CompilerServices;
26
using Amqp;
37
using Amqp.Framing;
@@ -7,20 +11,21 @@ namespace RabbitMQ.AMQP.Client.Impl;
711

812
public class AmqpConsumer : AbstractReconnectLifeCycle, IConsumer
913
{
10-
private enum PauseStatus
14+
private enum PauseStatus : byte
1115
{
12-
UNPAUSED,
13-
PAUSING,
14-
PAUSED
16+
UNPAUSED = 0,
17+
PAUSING = 1,
18+
PAUSED = 2
1519
}
1620

1721
private readonly AmqpConnection _connection;
18-
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
1922
private readonly string _address;
2023
private readonly MessageHandler _messageHandler;
2124
private readonly int _initialCredits;
2225
private readonly Map _filters;
2326
private ReceiverLink? _receiverLink;
27+
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
28+
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
2429

2530
public AmqpConsumer(AmqpConnection connection, string address,
2631
MessageHandler messageHandler, int initialCredits, Map filters)
@@ -88,7 +93,8 @@ await base.OpenAsync()
8893

8994
private void OnReceiverLinkMessage(IReceiverLink link, Message message)
9095
{
91-
IContext context = new DeliveryContext(link, message);
96+
_unsettledMessageCounter.Increment();
97+
IContext context = new DeliveryContext(link, message, _unsettledMessageCounter);
9298
_messageHandler(context, new AmqpMessage(message));
9399
}
94100

@@ -121,9 +127,12 @@ public void Pause()
121127
}
122128
}
123129

124-
public long UnsettledMessageCount()
130+
public long UnsettledMessageCount
125131
{
126-
throw new NotImplementedException();
132+
get
133+
{
134+
return _unsettledMessageCounter.Get();
135+
}
127136
}
128137

129138
public void Unpause()

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ internal void CheckResponse(Message sentMessage, int[] expectedResponseCodes, Me
389389
switch (responseCode)
390390
{
391391
case Code409:
392-
throw new PreconditionFailedException($"Precondition Fail. Message: {receivedMessage.Body}");
392+
throw new PreconditionFailedException($"{receivedMessage.Body}");
393393
}
394394

395395
// Check if the correlationId is the same as the messageId
Lines changed: 79 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System.Diagnostics;
1+
using System.Diagnostics;
22
using Amqp;
33
using Amqp.Framing;
44
using Trace = Amqp.Trace;
@@ -8,14 +8,13 @@ namespace RabbitMQ.AMQP.Client.Impl;
88

99
public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
1010
{
11-
private SenderLink? _senderLink = null;
12-
13-
private readonly ManualResetEvent _pausePublishing = new(true);
1411
private readonly AmqpConnection _connection;
1512
private readonly TimeSpan _timeout;
1613
private readonly string _address;
1714
private readonly int _maxInFlight;
1815

16+
private SenderLink? _senderLink = null;
17+
1918
public AmqpPublisher(AmqpConnection connection, string address, TimeSpan timeout, int maxInFlight)
2019
{
2120
_address = address;
@@ -75,33 +74,6 @@ await base.OpenAsync()
7574

7675
private string Id { get; } = Guid.NewGuid().ToString();
7776

78-
private void PausePublishing()
79-
{
80-
_pausePublishing.Reset();
81-
}
82-
83-
private void MaybeResumePublishing()
84-
{
85-
if (State is State.Open)
86-
{
87-
// Can be resumed only if the publisher is open and the in-flight messages are less than the max allowed
88-
// In case the publisher is closed, closing or reconnecting, the publishing will be paused
89-
_pausePublishing.Set();
90-
}
91-
}
92-
93-
private void MaybeBackPressure()
94-
{
95-
if (_currentInFlight >= _maxInFlight)
96-
{
97-
PausePublishing();
98-
}
99-
else
100-
{
101-
MaybeResumePublishing();
102-
}
103-
}
104-
10577
// TODO: Consider implementing this method with the send method
10678
// a way to send a batch of messages
10779

@@ -114,59 +86,37 @@ private void MaybeBackPressure()
11486
// return batch;
11587
// }
11688

117-
private int _currentInFlight = 0;
118-
119-
public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
89+
public async Task<PublishResult> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
12090
{
12191
ThrowIfClosed();
92+
93+
if (_senderLink is null)
94+
{
95+
// TODO create "internal bug" exception type?
96+
throw new InvalidOperationException("_senderLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
97+
}
98+
12299
try
123100
{
124-
_pausePublishing.WaitOne(_timeout);
125-
Interlocked.Increment(ref _currentInFlight);
126-
127-
var nMessage = ((AmqpMessage)message).NativeMessage;
128-
_senderLink?.Send(nMessage,
129-
(sender, outMessage, outcome, state) =>
130-
{
131-
Interlocked.Decrement(ref _currentInFlight);
132-
MaybeBackPressure();
133-
134-
if (outMessage == nMessage &&
135-
outMessage.GetEstimatedMessageSize() == nMessage.GetEstimatedMessageSize())
136-
{
137-
if (outcome is Rejected rejected)
138-
{
139-
outcomeCallback(message, new OutcomeDescriptor(rejected.Descriptor.Code,
140-
rejected.Descriptor.ToString(),
141-
OutcomeState.Failed, Utils.ConvertError(rejected?.Error)));
142-
}
143-
else
144-
{
145-
outcomeCallback(message, new OutcomeDescriptor(outcome.Descriptor.Code,
146-
outcome.Descriptor.ToString(),
147-
OutcomeState.Accepted, null));
148-
}
149-
}
150-
else
151-
{
152-
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Message not sent. Killing the process.");
153-
Process.GetCurrentProcess().Kill();
154-
}
155-
156-
// is it correct to dispose the message here?
157-
// maybe we should expose a method to dispose the message
158-
nMessage.Dispose();
159-
}, this);
101+
Debug.Assert(false == _senderLink.IsClosed);
102+
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
103+
await _senderLink.SendAsync(nativeMessage)
104+
.ConfigureAwait(false);
105+
106+
var publishOutcome = new PublishOutcome(OutcomeState.Accepted, null);
107+
return new PublishResult(message, publishOutcome);
108+
}
109+
catch (AmqpException ex)
110+
{
111+
var publishOutcome = new PublishOutcome(OutcomeState.Failed, Utils.ConvertError(ex.Error));
112+
return new PublishResult(message, publishOutcome);
160113
}
161114
catch (Exception e)
162115
{
163116
throw new PublisherException($"{ToString()} Failed to publish message, {e}");
164117
}
165-
166-
return Task.CompletedTask;
167118
}
168119

169-
170120
public override async Task CloseAsync()
171121
{
172122
if (State == State.Closed)
@@ -183,8 +133,6 @@ public override async Task CloseAsync()
183133
await _senderLink.CloseAsync()
184134
.ConfigureAwait(false);
185135
}
186-
187-
_pausePublishing.Dispose();
188136
}
189137
catch (Exception e)
190138
{
@@ -195,9 +143,64 @@ await _senderLink.CloseAsync()
195143
_connection.Publishers.TryRemove(Id, out _);
196144
}
197145

198-
199146
public override string ToString()
200147
{
201148
return $"Publisher{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
202149
}
203150
}
151+
152+
/*
153+
Task.Run(() =>
154+
{
155+
// TODO timeout / cancellation
156+
// TODO LRB I removed the nativeMessage == outMessage check because outMessage came in as NULL
157+
// which I didn't think possible 🤔
158+
void OutcomeCallback(ILink sender, Message outMessage, Outcome outcome, object state)
159+
{
160+
try
161+
{
162+
Interlocked.Decrement(ref _currentInFlight);
163+
MaybeBackPressure();
164+
165+
PublishOutcome publishOutcome;
166+
if (outcome is Rejected rejected)
167+
{
168+
publishOutcome = new PublishOutcome(rejected.Descriptor.Code, rejected.Descriptor.ToString(),
169+
OutcomeState.Failed, Utils.ConvertError(rejected?.Error));
170+
171+
}
172+
else
173+
{
174+
publishOutcome = new PublishOutcome(outcome.Descriptor.Code, outcome.Descriptor.ToString(),
175+
OutcomeState.Accepted, null);
176+
}
177+
178+
var result = new PublishResult(message, publishOutcome);
179+
publishedTcs.SetResult(result);
180+
}
181+
catch (Exception ex)
182+
{
183+
if (false == publishedTcs.TrySetException(ex))
184+
{
185+
// TODO log it at debug level?
186+
}
187+
}
188+
finally
189+
{
190+
// is it correct to dispose the message here?
191+
// maybe we should expose a method to dispose the message
192+
nativeMessage.Dispose();
193+
}
194+
}
195+
196+
try
197+
{
198+
Debug.Assert(false == _senderLink.IsClosed);
199+
_senderLink.Send(nativeMessage, OutcomeCallback, this);
200+
}
201+
catch (Exception ex)
202+
{
203+
publishedTcs.SetException(ex);
204+
}
205+
}, cancellationToken);
206+
*/

0 commit comments

Comments
 (0)