Skip to content

Port more tests from rabbitmq-amqp-java-client #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
<!-- Tests -->
<PackageVersion Include="AltCover" Version="8.8.165" />
<PackageVersion Include="AltCover" Version="8.9.3" />
<PackageVersion Include="xunit" Version="2.9.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
<PackageVersion Include="Xunit.SkippableFact" Version="1.4.13" />
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.0" />
<PackageVersion Include="EasyNetQ.Management.Client" Version="3.0.0" />
</ItemGroup>
<ItemGroup Label=".NET 6 Specific" Condition="'$(TargetFramework)' == 'net6.0'">
Expand All @@ -31,4 +31,4 @@
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="MinVer" Version="5.0.0" />
</ItemGroup>
</Project>
</Project>
2 changes: 2 additions & 0 deletions RabbitMQ.AMQP.Client/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ namespace RabbitMQ.AMQP.Client;
public interface IMessage
{
// TODO: Complete the IMessage interface with all the properties

// TODO does this depend on NativeMessage.BodySection?
public object Body();

// properties
Expand Down
5 changes: 4 additions & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private async Task ProcessMessages()

while (_receiverLink is { LinkState: LinkState.Attached })
{
TimeSpan timeout = TimeSpan.FromSeconds(60); // TODO configurable
// TODO the timeout waiting for messages should be configurable
TimeSpan timeout = TimeSpan.FromSeconds(60);
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
if (nativeMessage is null)
{
Expand Down Expand Up @@ -157,6 +158,8 @@ private async Task ProcessMessages()
}

Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to process messages, {e}");
// TODO this is where a Listener should get a closed event
// See the ConsumerShouldBeClosedWhenQueueIsDeleted test
}

Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed.");
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task<IQueueInfo> GetQueueInfoAsync(string queueName,
return new DefaultQueueInfo((Map)response.Body);
}

public IQueueSpecification Queue(QueueSpec spec)
internal IQueueSpecification Queue(QueueSpec spec)
{
return Queue().Name(spec.Name)
.AutoDelete(spec.AutoDelete)
Expand Down Expand Up @@ -410,7 +410,7 @@ internal void CheckResponse(Message sentMessage, int[] expectedResponseCodes, Me
switch (responseCode)
{
case Code409:
throw new PreconditionFailedException($"{receivedMessage.Body}");
throw new PreconditionFailedException($"{receivedMessage.Body}, response code: {responseCode}");
}

// Check if the correlationId is the same as the messageId
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ private void EnsureAnnotations()
NativeMessage.MessageAnnotations ??= new MessageAnnotations();
}


public object Body()
{
// TODO do we need to do anything with NativeMessage.BodySection?
return NativeMessage.Body;
}

Expand Down
11 changes: 8 additions & 3 deletions RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,19 @@ void OutcomeCallback(ILink sender, Message message, Outcome outcome, object stat
// Note: sometimes `message` is null 🤔
// System.Diagnostics.Debug.Assert(Object.ReferenceEquals(nativeMessage, message));

OutcomeState publishState = OutcomeState.Accepted;
// TODO what about other outcomes, like Released?
PublishOutcome publishOutcome;
if (outcome is Rejected rejectedOutcome)
{
publishState = OutcomeState.Failed;
OutcomeState publishState = OutcomeState.Failed;
publishOutcome = new PublishOutcome(publishState, Utils.ConvertError(rejectedOutcome.Error));
}
else
{
OutcomeState publishState = OutcomeState.Accepted;
publishOutcome = new PublishOutcome(publishState, null);
}

var publishOutcome = new PublishOutcome(publishState, null);
messagePublishedTcs.SetResult(publishOutcome);
}

Expand Down
8 changes: 1 addition & 7 deletions RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,7 @@ public IQueueSpecification Arguments(Dictionary<object, object> arguments)

public Dictionary<object, object> Arguments()
{
var result = new Dictionary<object, object>();
foreach ((object? key, object? value) in _arguments)
{
result[key] = value;
}

return result;
return _arguments;
}

public IQueueSpecification Type(QueueType type)
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/Impl/RecordingTopologyListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ await visitor.VisitQueuesAsync(_queueSpecifications.Values)
}
}

// TODO this could probably be made internal
public class QueueSpec(IQueueSpecification specification)
{
public string Name { get; init; } = specification.Name();
Expand Down
1 change: 0 additions & 1 deletion RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpManagement.Exchange(string! name) -> RabbitMQ.AMQP
RabbitMQ.AMQP.Client.Impl.AmqpManagement.GetQueueInfoAsync(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IQueueInfo!>!
RabbitMQ.AMQP.Client.Impl.AmqpManagement.GetQueueInfoAsync(string! queueName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IQueueInfo!>!
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue(RabbitMQ.AMQP.Client.Impl.QueueSpec! spec) -> RabbitMQ.AMQP.Client.IQueueSpecification!
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue(string! name) -> RabbitMQ.AMQP.Client.IQueueSpecification!
RabbitMQ.AMQP.Client.Impl.AmqpManagement.TopologyListener() -> RabbitMQ.AMQP.Client.ITopologyListener!
RabbitMQ.AMQP.Client.Impl.AmqpMessage
Expand Down
156 changes: 113 additions & 43 deletions Tests/AmqpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
Expand All @@ -16,20 +17,26 @@ namespace Tests;

public class AmqpTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
{
[Fact]
public async Task QueueInfoTest()
private readonly byte[] _messageBody = Encoding.UTF8.GetBytes("hello");

[Theory]
[InlineData(QueueType.CLASSIC, "classic")]
[InlineData(QueueType.QUORUM, "quorum")]
public async Task QueueInfoTest(QueueType expectedQueueType, string expectedQueueTypeName)
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

IQueueInfo declaredQueueInfo = await _management.Queue(_queueName).Quorum().Queue().DeclareAsync();
IQueueSpecification queueSpecification = _management.Queue(_queueName).Type(expectedQueueType);

IQueueInfo declaredQueueInfo = await queueSpecification.DeclareAsync();
IQueueInfo retrievedQueueInfo = await _management.GetQueueInfoAsync(_queueName);

Assert.Equal(_queueName, declaredQueueInfo.Name());
Assert.Equal(_queueName, retrievedQueueInfo.Name());

Assert.Equal(QueueType.QUORUM, declaredQueueInfo.Type());
Assert.Equal(QueueType.QUORUM, retrievedQueueInfo.Type());
Assert.Equal(expectedQueueType, declaredQueueInfo.Type());
Assert.Equal(expectedQueueType, retrievedQueueInfo.Type());

Assert.True(declaredQueueInfo.Durable());
Assert.True(retrievedQueueInfo.Durable());
Expand All @@ -48,48 +55,44 @@ public async Task QueueInfoTest()

Dictionary<string, object> declaredArgs = declaredQueueInfo.Arguments();
Dictionary<string, object> retrievedArgs = retrievedQueueInfo.Arguments();

Assert.True(declaredArgs.ContainsKey("x-queue-type"));
Assert.True(retrievedArgs.ContainsKey("x-queue-type"));
Assert.Equal(declaredArgs["x-queue-type"], "quorum");
Assert.Equal(retrievedArgs["x-queue-type"], "quorum");
Assert.Equal(declaredArgs["x-queue-type"], expectedQueueTypeName);
Assert.Equal(retrievedArgs["x-queue-type"], expectedQueueTypeName);
}

[Theory]
[InlineData("foobar")]
[InlineData("фообар")]
public async Task QueueDeclareDeletePublishConsume(string subject)
[InlineData("foobar", QueueType.CLASSIC)]
[InlineData("foobar", QueueType.QUORUM)]
[InlineData("фообар", QueueType.CLASSIC)]
[InlineData("фообар", QueueType.QUORUM)]
public async Task QueueDeclareDeletePublishConsume(string subject, QueueType expectedQueueType)
{
byte[] messageBody = Encoding.UTF8.GetBytes("hello");
const int messageCount = 100;

Assert.NotNull(_connection);
Assert.NotNull(_management);

// IQueueInfo declaredQueueInfo = await _management.Queue().Name(_queueName).Quorum().Queue().Declare();
IQueueInfo declaredQueueInfo = await _management.Queue().Name(_queueName).Classic().Queue().DeclareAsync();
Assert.Equal(_queueName, declaredQueueInfo.Name());

IPublisherBuilder publisherBuilder = _connection.PublisherBuilder();
IPublisher publisher = await publisherBuilder.Queue(declaredQueueInfo.Name()).BuildAsync();

var publishTasks = new List<Task<PublishResult>>();
for (int i = 0; i < messageCount; i++)
IQueueSpecification? queueSpecification = null;
switch (expectedQueueType)
{
Guid messageId = Guid.NewGuid();

IMessage message = new AmqpMessage(messageBody);
message.MessageId(messageId.ToString());
message.Subject(subject);
publishTasks.Add(publisher.PublishAsync(message));
case QueueType.CLASSIC:
queueSpecification = _management.Queue().Name(_queueName).Classic().Queue();
break;
case QueueType.QUORUM:
queueSpecification = _management.Queue().Name(_queueName).Quorum().Queue();
break;
default:
Assert.Fail();
break;
}

await WhenAllComplete(publishTasks);
IQueueInfo declaredQueueInfo = await queueSpecification.DeclareAsync();
Assert.Equal(_queueName, declaredQueueInfo.Name());

foreach (Task<PublishResult> pt in publishTasks)
{
PublishResult pr = await pt;
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
}
await PublishWithSubjectAsync(queueSpecification, messageCount, subject: subject);

IQueueInfo retrievedQueueInfo0 = await _management.GetQueueInfoAsync(_queueName);
Assert.Equal(_queueName, retrievedQueueInfo0.Name());
Expand All @@ -99,20 +102,30 @@ public async Task QueueDeclareDeletePublishConsume(string subject)
long receivedMessageCount = 0;
var allMessagesReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
string? receivedSubject = null;
var messageIds = new ConcurrentBag<int>();
async Task MessageHandler(IContext ctx, IMessage msg)
{
receivedSubject = msg.Subject();
await ctx.AcceptAsync();
if (Interlocked.Increment(ref receivedMessageCount) == messageCount)
try
{
allMessagesReceivedTcs.SetResult();
receivedSubject = msg.Subject();
messageIds.Add(int.Parse(msg.MessageId()));
await ctx.AcceptAsync();
if (Interlocked.Increment(ref receivedMessageCount) == messageCount)
{
allMessagesReceivedTcs.SetResult();
}
}
catch (Exception ex)
{
allMessagesReceivedTcs.SetException(ex);
}
}

IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder();
IConsumer consumer = await consumerBuilder.Queue(_queueName).MessageHandler(MessageHandler).BuildAsync();

await WhenTaskCompletes(allMessagesReceivedTcs.Task);
Assert.Equal(messageCount, messageIds.Count);

Assert.NotNull(receivedSubject);
Assert.Equal(subject, receivedSubject);
Expand All @@ -121,8 +134,6 @@ async Task MessageHandler(IContext ctx, IMessage msg)
Assert.Equal((uint)1, retrievedQueueInfo1.ConsumerCount());
Assert.Equal((uint)0, retrievedQueueInfo1.MessageCount());

await publisher.CloseAsync();
publisher.Dispose();
await consumer.CloseAsync();
consumer.Dispose();
}
Expand All @@ -136,8 +147,6 @@ async Task MessageHandler(IContext ctx, IMessage msg)
[InlineData("фоо!бар", true)]
public async Task BindingTest(string prefix, bool addBindingArgments)
{
byte[] messageBody = Encoding.UTF8.GetBytes("hello");

Assert.NotNull(_connection);
Assert.NotNull(_management);

Expand Down Expand Up @@ -177,7 +186,7 @@ public async Task BindingTest(string prefix, bool addBindingArgments)
IPublisher publisher1 = await publisherBuilder1.Exchange(ex1spec).Key(rkStr).BuildAsync();
IPublisher publisher2 = await publisherBuilder2.Exchange(ex2spec).BuildAsync();

IMessage message = new AmqpMessage(messageBody);
IMessage message = new AmqpMessage(_messageBody);

Task<PublishResult> publish1Task = publisher1.PublishAsync(message);
Task<PublishResult> publish2Task = publisher2.PublishAsync(message);
Expand All @@ -194,10 +203,17 @@ public async Task BindingTest(string prefix, bool addBindingArgments)
var allMessagesReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
async Task MessageHandler(IContext ctx, IMessage msg)
{
await ctx.AcceptAsync();
if (Interlocked.Increment(ref receivedMessageCount) == expectedMessageCount)
try
{
allMessagesReceivedTcs.SetResult();
await ctx.AcceptAsync();
if (Interlocked.Increment(ref receivedMessageCount) == expectedMessageCount)
{
allMessagesReceivedTcs.SetResult();
}
}
catch (Exception ex)
{
allMessagesReceivedTcs.SetException(ex);
}
}

Expand All @@ -222,4 +238,58 @@ async Task MessageHandler(IContext ctx, IMessage msg)
await ex2spec.DeleteAsync();
// Note: DisposeAsync will delete the queue
}

[Fact]
public async Task SameTypeMessagesInQueue()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

IQueueSpecification queueSpecification = _management.Queue(_queueName).Exclusive(true);
IQueueInfo declaredQueueInfo = await queueSpecification.DeclareAsync();

var messageBodies = new ConcurrentBag<string>();
const int expectedMessageCount = 2;
long receivedMessageCount = 0;
var allMessagesReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
async Task MessageHandler(IContext ctx, IMessage msg)
{
try
{
await ctx.AcceptAsync();
messageBodies.Add(Encoding.UTF8.GetString((byte[])msg.Body()));
if (Interlocked.Increment(ref receivedMessageCount) == expectedMessageCount)
{
allMessagesReceivedTcs.SetResult();
}
}
catch (Exception ex)
{
allMessagesReceivedTcs.SetException(ex);
}
}

IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder();
IConsumer consumer = await consumerBuilder.Queue(queueSpecification).MessageHandler(MessageHandler).BuildAsync();

IPublisherBuilder publisherBuilder = _connection.PublisherBuilder();
IPublisher publisher = await publisherBuilder.Queue(queueSpecification).BuildAsync();

IMessage message1 = new AmqpMessage(_messageBody);
IMessage message2 = new AmqpMessage(Encoding.UTF8.GetBytes("world"));

Task<PublishResult> publish1Task = publisher.PublishAsync(message1);
Task<PublishResult> publish2Task = publisher.PublishAsync(message2);
await WhenAllComplete([publish1Task, publish2Task]);

PublishResult publish1Result = await publish1Task;
Assert.Equal(OutcomeState.Accepted, publish1Result.Outcome.State);
PublishResult publish2Result = await publish2Task;
Assert.Equal(OutcomeState.Accepted, publish2Result.Outcome.State);

await WhenTaskCompletes(allMessagesReceivedTcs.Task);

Assert.Contains("hello", messageBodies);
Assert.Contains("world", messageBodies);
}
}
Loading