Skip to content

Commit 1977f8b

Browse files
committed
* Fix AmqpPublisher's PublishAsync method to be actually async.
1 parent 1de60a7 commit 1977f8b

File tree

13 files changed

+160
-45
lines changed

13 files changed

+160
-45
lines changed

RabbitMQ.AMQP.Client/IPublisher.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ public class OutcomeDescriptor(ulong code, string description, OutcomeState stat
2121

2222
public interface IPublisher : ILifeCycle
2323
{
24-
// TODO this should be named PublishAsync
25-
Task Publish(IMessage message,
26-
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack
24+
Task PublishAsync(IMessage message,
25+
OutcomeDescriptorCallback outcomeCallback, CancellationToken cancellationToken = default);
2726
}

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ internal void CheckResponse(Message sentMessage, int[] expectedResponseCodes, Me
389389
switch (responseCode)
390390
{
391391
case Code409:
392-
throw new PreconditionFailedException($"Precondition Fail. Message: {receivedMessage.Body}");
392+
throw new PreconditionFailedException($"{receivedMessage.Body}");
393393
}
394394

395395
// Check if the correlationId is the same as the messageId

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Diagnostics;
2-
using Amqp;
1+
using Amqp;
32
using Amqp.Framing;
43
using Trace = Amqp.Trace;
54
using TraceLevel = Amqp.TraceLevel;
@@ -10,7 +9,9 @@ public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
109
{
1110
private SenderLink? _senderLink = null;
1211

13-
private readonly ManualResetEvent _pausePublishing = new(true);
12+
// TODO should be a semaphore, and disposed
13+
private readonly ManualResetEventSlim _pausePublishing = new(true);
14+
1415
private readonly AmqpConnection _connection;
1516
private readonly TimeSpan _timeout;
1617
private readonly string _address;
@@ -116,54 +117,77 @@ private void MaybeBackPressure()
116117

117118
private int _currentInFlight = 0;
118119

119-
public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
120+
public async Task PublishAsync(IMessage message,
121+
OutcomeDescriptorCallback outcomeDescriptorCallback,
122+
CancellationToken cancellationToken = default)
120123
{
121124
ThrowIfClosed();
125+
126+
var publishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
127+
128+
if (_senderLink is null)
129+
{
130+
// TODO create "internal bug" exception type?
131+
throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
132+
}
133+
122134
try
123135
{
124-
_pausePublishing.WaitOne(_timeout);
136+
// TODO use semaphore
137+
_pausePublishing.Wait(_timeout);
125138
Interlocked.Increment(ref _currentInFlight);
139+
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
126140

127-
var nMessage = ((AmqpMessage)message).NativeMessage;
128-
_senderLink?.Send(nMessage,
129-
(sender, outMessage, outcome, state) =>
141+
Task publishTask = Task.Run(() =>
142+
{
143+
// TODO timeout / cancellation
144+
// TODO LRB I removed the nativeMessage == outMessage check because outMessage came in as NULL
145+
// which I didn't think possible 🤔
146+
void OutcomeCallback(ILink sender, Message outMessage, Outcome outcome, object state)
130147
{
131-
Interlocked.Decrement(ref _currentInFlight);
132-
MaybeBackPressure();
133-
134-
if (outMessage == nMessage &&
135-
outMessage.GetEstimatedMessageSize() == nMessage.GetEstimatedMessageSize())
148+
try
136149
{
150+
Interlocked.Decrement(ref _currentInFlight);
151+
MaybeBackPressure();
152+
137153
if (outcome is Rejected rejected)
138154
{
139-
outcomeCallback(message, new OutcomeDescriptor(rejected.Descriptor.Code,
155+
outcomeDescriptorCallback(message, new OutcomeDescriptor(rejected.Descriptor.Code,
140156
rejected.Descriptor.ToString(),
141157
OutcomeState.Failed, Utils.ConvertError(rejected?.Error)));
158+
142159
}
143160
else
144161
{
145-
outcomeCallback(message, new OutcomeDescriptor(outcome.Descriptor.Code,
162+
outcomeDescriptorCallback(message, new OutcomeDescriptor(outcome.Descriptor.Code,
146163
outcome.Descriptor.ToString(),
147164
OutcomeState.Accepted, null));
148165
}
166+
167+
publishedTcs.SetResult();
149168
}
150-
else
169+
finally
151170
{
152-
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Message not sent. Killing the process.");
153-
Process.GetCurrentProcess().Kill();
171+
// is it correct to dispose the message here?
172+
// maybe we should expose a method to dispose the message
173+
nativeMessage.Dispose();
154174
}
175+
}
176+
177+
_senderLink.Send(nativeMessage, OutcomeCallback, this);
178+
}, cancellationToken);
155179

156-
// is it correct to dispose the message here?
157-
// maybe we should expose a method to dispose the message
158-
nMessage.Dispose();
159-
}, this);
180+
// TODO timeouts / cancellation for both of these
181+
await publishTask
182+
.ConfigureAwait(false);
183+
184+
await publishedTcs.Task
185+
.ConfigureAwait(false);
160186
}
161187
catch (Exception e)
162188
{
163189
throw new PublisherException($"{ToString()} Failed to publish message, {e}");
164190
}
165-
166-
return Task.CompletedTask;
167191
}
168192

169193

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpNotOpenException
265265
RabbitMQ.AMQP.Client.Impl.AmqpNotOpenException.AmqpNotOpenException(string! message) -> void
266266
RabbitMQ.AMQP.Client.Impl.AmqpPublisher
267267
RabbitMQ.AMQP.Client.Impl.AmqpPublisher.AmqpPublisher(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, string! address, System.TimeSpan timeout, int maxInFlight) -> void
268-
RabbitMQ.AMQP.Client.Impl.AmqpPublisher.Publish(RabbitMQ.AMQP.Client.IMessage! message, RabbitMQ.AMQP.Client.OutcomeDescriptorCallback! outcomeCallback) -> System.Threading.Tasks.Task!
268+
RabbitMQ.AMQP.Client.Impl.AmqpPublisher.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, RabbitMQ.AMQP.Client.OutcomeDescriptorCallback! outcomeDescriptorCallback, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
269269
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder
270270
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.AmqpPublisherBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
271271
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.BuildAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IPublisher!>!
@@ -447,7 +447,7 @@ RabbitMQ.AMQP.Client.Impl.Utils
447447
RabbitMQ.AMQP.Client.InvalidAddressException
448448
RabbitMQ.AMQP.Client.InvalidAddressException.InvalidAddressException(string! message) -> void
449449
RabbitMQ.AMQP.Client.IPublisher
450-
RabbitMQ.AMQP.Client.IPublisher.Publish(RabbitMQ.AMQP.Client.IMessage! message, RabbitMQ.AMQP.Client.OutcomeDescriptorCallback! outcomeCallback) -> System.Threading.Tasks.Task!
450+
RabbitMQ.AMQP.Client.IPublisher.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, RabbitMQ.AMQP.Client.OutcomeDescriptorCallback! outcomeCallback, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
451451
RabbitMQ.AMQP.Client.IPublisherBuilder
452452
RabbitMQ.AMQP.Client.IPublisherBuilder.BuildAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IPublisher!>!
453453
RabbitMQ.AMQP.Client.IPublisherBuilder.MaxInflightMessages(int maxInFlight) -> RabbitMQ.AMQP.Client.IPublisherBuilder!

Tests/AmqpTests.cs

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public async Task QueueDeclareDeletePublishConsume(string subject)
117117
message.MessageId(messageId.ToString());
118118
message.Subject(subject);
119119

120-
publishTasks.Add(publisher.Publish(message, (IMessage msg, OutcomeDescriptor outcome) =>
120+
publishTasks.Add(publisher.PublishAsync(message, (IMessage msg, OutcomeDescriptor outcome) =>
121121
{
122122
if (outcome.State == OutcomeState.Accepted)
123123
{
@@ -167,4 +167,96 @@ void MessageHandler(IContext ctx, IMessage msg)
167167
await publisher.CloseAsync();
168168
await consumer.CloseAsync();
169169
}
170+
171+
[Theory]
172+
[InlineData("foo", false)]
173+
[InlineData("foo", true)]
174+
[InlineData("фообар", true)]
175+
[InlineData("фообар", false)]
176+
[InlineData("фоо!бар", false)]
177+
[InlineData("фоо!бар", true)]
178+
public async Task BindingTest(string prefix, bool addBindingArgments)
179+
{
180+
byte[] messageBody = Encoding.UTF8.GetBytes("hello");
181+
182+
Assert.NotNull(_connection);
183+
Assert.NotNull(_management);
184+
185+
string now = Now;
186+
string e1 = $"{prefix}-e1-{_testDisplayName}-{now}";
187+
string e2 = $"{prefix}-e2-{_testDisplayName}-{now}";
188+
string rk = $"{prefix}-foo-{now}";
189+
190+
Dictionary<string, object> bindingArguments = new();
191+
if (addBindingArgments)
192+
{
193+
bindingArguments.Add("foo", prefix + "-bar");
194+
}
195+
196+
await _management.Exchange().Name(e1).Type(ExchangeType.DIRECT).Declare();
197+
await _management.Exchange().Name(e2).Type(ExchangeType.FANOUT).Declare();
198+
await _management.Queue().Name(_queueName).Type(QueueType.CLASSIC).Declare();
199+
200+
IBindingSpecification e1e2Binding = _management.Binding().SourceExchange(e1).DestinationExchange(e2).Key(rk).Arguments(bindingArguments);
201+
IBindingSpecification e2qBinding = _management.Binding().SourceExchange(e2).DestinationQueue(_queueName).Arguments(bindingArguments);
202+
203+
await e1e2Binding.Bind();
204+
await e2qBinding.Bind();
205+
206+
IPublisherBuilder publisherBuilder1 = _connection.PublisherBuilder();
207+
IPublisherBuilder publisherBuilder2 = _connection.PublisherBuilder();
208+
209+
IPublisher publisher1 = await publisherBuilder1.Exchange(e1).Key(rk).BuildAsync();
210+
IPublisher publisher2 = await publisherBuilder2.Exchange(e2).BuildAsync();
211+
212+
IMessage message = new AmqpMessage(messageBody);
213+
214+
const int expectedMessageCount = 2;
215+
int publishCount = 0;
216+
var allMessagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
217+
void PublishCallback(IMessage msg, OutcomeDescriptor outcome)
218+
{
219+
if (outcome.State == OutcomeState.Accepted)
220+
{
221+
if (Interlocked.Increment(ref publishCount) == expectedMessageCount)
222+
{
223+
allMessagesPublishedTcs.SetResult();
224+
}
225+
}
226+
}
227+
228+
Task publish1Task = publisher1.PublishAsync(message, PublishCallback);
229+
Task publish2Task = publisher2.PublishAsync(message, PublishCallback);
230+
231+
await WhenAllComplete([publish1Task, publish2Task]);
232+
await WhenTaskCompletes(allMessagesPublishedTcs.Task);
233+
234+
long receivedMessageCount = 0;
235+
var allMessagesReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
236+
void MessageHandler(IContext ctx, IMessage msg)
237+
{
238+
ctx.Accept();
239+
if (Interlocked.Increment(ref receivedMessageCount) == expectedMessageCount)
240+
{
241+
allMessagesReceivedTcs.SetResult();
242+
}
243+
}
244+
245+
IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder();
246+
IConsumer consumer = await consumerBuilder.Queue(_queueName).MessageHandler(MessageHandler).BuildAsync();
247+
248+
await WhenTaskCompletes(allMessagesReceivedTcs.Task);
249+
250+
await publisher1.CloseAsync();
251+
await publisher2.CloseAsync();
252+
await consumer.CloseAsync();
253+
254+
// TODO these fail with 400
255+
// await e1e2Binding.Unbind();
256+
// await e2qBinding.Unbind();
257+
258+
await _management.ExchangeDeletion().Delete(e2);
259+
await _management.ExchangeDeletion().Delete(e1);
260+
// Note: DisposeAsync will delete the queue
261+
}
170262
}

Tests/ConnectionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public async Task ThrowAmqpClosedExceptionWhenItemIsClosed()
105105
await publisher.CloseAsync();
106106

107107
await Assert.ThrowsAsync<AmqpNotOpenException>(async () =>
108-
await publisher.Publish(new AmqpMessage("Hello wold!"), (message, descriptor) =>
108+
await publisher.PublishAsync(new AmqpMessage("Hello wold!"), (message, descriptor) =>
109109
{
110110
// it doest matter
111111
}));

Tests/Consumer/BasicConsumerTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public async Task SimpleConsumeMessage()
2121

2222
IPublisher publisher = await connection.PublisherBuilder().Queue("SimpleConsumeMessage").BuildAsync();
2323

24-
await publisher.Publish(new AmqpMessage("Hello world!"),
24+
await publisher.PublishAsync(new AmqpMessage("Hello world!"),
2525
(_, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); });
2626

2727
TaskCompletionSource<IMessage> tcs = new();
@@ -59,7 +59,7 @@ public async Task ConsumerReQueueMessage()
5959

6060
IPublisher publisher = await connection.PublisherBuilder().Queue("ConsumerReQueueMessage").BuildAsync();
6161

62-
await publisher.Publish(new AmqpMessage("Hello world!"),
62+
await publisher.PublishAsync(new AmqpMessage("Hello world!"),
6363
(_, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); });
6464

6565
TaskCompletionSource<int> tcs = new();
@@ -104,7 +104,7 @@ public async Task ConsumerRejectOnlySomeMessage()
104104

105105
for (int i = 0; i < 500; i++)
106106
{
107-
await publisher.Publish(new AmqpMessage($"message_{i}"),
107+
await publisher.PublishAsync(new AmqpMessage($"message_{i}"),
108108
(_, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); });
109109
}
110110

@@ -304,7 +304,7 @@ private static async Task Publish(IConnection connection, string queue, int numb
304304
message.Annotation("x-stream-filter-value", filter);
305305
}
306306

307-
await publisher.Publish(message,
307+
await publisher.PublishAsync(message,
308308
(_, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); });
309309
}
310310
}

Tests/Consumer/ConsumerPauseTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public async Task PauseShouldStopMessageArrivalUnpauseShouldResumeIt()
3939
{
4040
int idx = i;
4141
IMessage message = new AmqpMessage($"message_{i}");
42-
publishTasks.Add(publisher.Publish(message,
42+
publishTasks.Add(publisher.PublishAsync(message,
4343
(message, descriptor) =>
4444
{
4545
Assert.Equal(OutcomeState.Accepted, descriptor.State);

Tests/PublisherConsumerRecoveryTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ await connection.Management().Queue()
178178
int messagesConfirmed = 0;
179179
for (int i = 0; i < 10; i++)
180180
{
181-
await publisher.Publish(new AmqpMessage("Hello World"),
181+
await publisher.PublishAsync(new AmqpMessage("Hello World"),
182182
(message, descriptor) =>
183183
{
184184
Assert.Equal(OutcomeState.Accepted, descriptor.State);
@@ -196,7 +196,7 @@ await publisher.Publish(new AmqpMessage("Hello World"),
196196

197197
for (int i = 0; i < 10; i++)
198198
{
199-
await publisher.Publish(new AmqpMessage("Hello World"),
199+
await publisher.PublishAsync(new AmqpMessage("Hello World"),
200200
(message, descriptor) =>
201201
{
202202
Interlocked.Increment(ref messagesConfirmed);

Tests/PublisherTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public async Task SendAMessageToAQueue()
4949

5050
IPublisher publisher = await connection.PublisherBuilder().Queue("queue_to_send").BuildAsync();
5151

52-
await publisher.Publish(new AmqpMessage("Hello wold!"),
52+
await publisher.PublishAsync(new AmqpMessage("Hello wold!"),
5353
(message, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); });
5454

5555
await SystemUtils.WaitUntilQueueMessageCount("queue_to_send", 1);
@@ -75,7 +75,7 @@ public async Task ValidatePublishersCount()
7575
{
7676
IPublisher publisher = await connection.PublisherBuilder().Queue("queue_publishers_count").BuildAsync();
7777

78-
await publisher.Publish(new AmqpMessage("Hello wold!"),
78+
await publisher.PublishAsync(new AmqpMessage("Hello wold!"),
7979
(message, descriptor) =>
8080
{
8181
Assert.Equal(OutcomeState.Accepted, descriptor.State);
@@ -110,7 +110,7 @@ await management.Binding().SourceExchange("exchange_to_send").DestinationQueue("
110110

111111
IPublisher publisher = await connection.PublisherBuilder().Exchange("exchange_to_send").Key("key").BuildAsync();
112112

113-
await publisher.Publish(new AmqpMessage("Hello wold!"),
113+
await publisher.PublishAsync(new AmqpMessage("Hello wold!"),
114114
(message, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); });
115115

116116
await SystemUtils.WaitUntilQueueMessageCount("queue_to_send_1", 1);

docs/Examples/GettingStarted/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
const int total = 10;
4040
for (int i = 0; i < total; i++)
4141
{
42-
await publisher.Publish(
42+
await publisher.PublishAsync(
4343
new AmqpMessage($"Hello World_{i}"),
4444
(message, descriptor) =>
4545
{

docs/Examples/HAClient/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
try
8787
{
8888
pausePublishing.WaitOne();
89-
await publisher.Publish(
89+
await publisher.PublishAsync(
9090
new AmqpMessage($"Hello World_{i}"),
9191
(message, descriptor) =>
9292
{

docs/Examples/PerformancesTest/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
Console.WriteLine($"Sending Time: {endp - start} - messages {i}");
6161
}
6262

63-
await publisher.Publish(
63+
await publisher.PublishAsync(
6464
new AmqpMessage(new byte[10]),
6565
(message, descriptor) =>
6666
{

0 commit comments

Comments
 (0)