Skip to content

Commit b96375e

Browse files
committed
Implement AMQP filter expressions
* Start by refactoring a couple tests that use `ManualResetEvent` for coordination. * Correctly set stream filter AMQP types in the filters map. * Comment-out extra logging * Replace `ManualResetEvent` with `AsyncManualResetEvent` * Add `IStreamFilterOptions` * Port code documentation from Java client * Add more docs from Java client. * Add `IStreamFilterOptions` member functions. * Begin porting `filterExpressionApplicationProperties` test from the Java client. * Use `pivotalrabbitmq/rabbitmq:main` because it supports AMQP 1.0 filters * Add overloads for specific types for MessageId and CorrelationId * Just settle on `object` for Message ID and Correlation ID * Implement the rest of the message properties in `IMessage`. * Rename `ApplicationProperties` to `Properties` to match Java client. * `StreamConsumerFilterExpressionApplicationProperties` test runs successfully. * Finish `StreamConsumerFilterExpressionApplicationProperties` test. * Port `streamFiltering` test from Java client. * Bump testing Windows versions * Add `Semver` package to Tests project to allow parsing broker version. * Skip stream filtering tests (on properties) if not at least RabbitMQ 4.1.0 * Change `IConnection` API to use `IEnumerable` instead of `ReadOnlyCollection` * Add some messaging for when the StreamFilter test comparison fails. * Fix the `StreamFilter` test. * Validate message annotation key to ensure they start with `x-` * Validate the string overload for specifying a stream offset. * Port `StreamConsumerOptionsOffsetInterval` test from Java client. * Port `StreamConsumerOptionsOffsetNext` from Java client. * Port `StreamConsumerOptionsOffsetLong` from Java client. * Working on `FilterExpressionProperties` * Finish port of `FilterExpressionProperties` test. * Link operations Accept, Requeue, Discard can't be async * Continue working on `FilterExpressionPropertiesAndApplicationProperties` test * Remove `Thread.Sleep` * Finish `FilterExpressionPropertiesAndApplicationProperties` test. * Remove dupicated code around validating message annotation key * Move code out of `Utils` that is only used for tests * Finish porting last two tests from `SourceFiltersTest.java` * Remove `AmqpConnection` from `ConsumerConfiguration`. * Add TODO to detect RMQ version when stream filters are used. * Add TLS callback fot the tests * Add arm build for the docker image * Parse broker version and set filter capabilities based on it. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 7cfdcbb commit b96375e

40 files changed

+2136
-449
lines changed

.ci/ubuntu/cluster/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ function run_docker_compose
1919
docker compose --file "$script_dir/docker-compose.yml" $@
2020
}
2121

22-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"
22+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
2323

2424
if [[ ! -v GITHUB_ACTIONS ]]
2525
then

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
1010

11-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"
11+
if [[ $3 == 'arm' ]]
12+
then
13+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
14+
else
15+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
16+
fi
1217

1318

1419
readonly docker_name_prefix='rabbitmq-amqp-dotnet-client'

.ci/windows/versions.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"erlang": "27.0.1",
3-
"rabbitmq": "4.0.2"
2+
"erlang": "27.1.2",
3+
"rabbitmq": "4.0.3"
44
}

Directory.Packages.props

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
<PropertyGroup>
33
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
44
</PropertyGroup>
5-
65
<ItemGroup>
76
<!-- RabbitMQ.Amqp.Client -->
87
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
8+
<!-- HAClient -->
9+
<PackageVersion Include="DotNext.Threading" Version="5.15.0" />
910
<!-- Tests -->
1011
<PackageVersion Include="xunit" Version="2.9.0" />
1112
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
@@ -14,7 +15,6 @@
1415
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.0" />
1516
<PackageVersion Include="EasyNetQ.Management.Client" Version="3.0.0" />
1617
</ItemGroup>
17-
1818
<ItemGroup Condition="$(TargetFramework)=='netstandard2.0'">
1919
<!--
2020
Note: do NOT upgrade these dependencies unless necessary
@@ -25,19 +25,15 @@
2525
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
2626
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.0.0" />
2727
</ItemGroup>
28-
2928
<ItemGroup Condition="'$(TargetFramework)'=='net6.0'">
3029
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
3130
</ItemGroup>
32-
3331
<ItemGroup Condition="'$(TargetFramework)'=='net8.0'">
3432
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
3533
</ItemGroup>
36-
3734
<ItemGroup Condition="'$(TargetFrameworkIdentifier)'=='.NETFramework'">
3835
<GlobalPackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />
3936
</ItemGroup>
40-
4137
<ItemGroup Condition="'$(IsPackable)'=='true'">
4238
<GlobalPackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" />
4339
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />

RabbitMQ.AMQP.Client/IConnection.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6-
using System.Collections.ObjectModel;
6+
using System.Collections.Generic;
77

88
namespace RabbitMQ.AMQP.Client
99
{
@@ -30,9 +30,11 @@ public interface IConnection : ILifeCycle
3030

3131
IRpcClientBuilder RpcClientBuilder();
3232

33-
public ReadOnlyCollection<IPublisher> GetPublishers();
33+
public IReadOnlyDictionary<string, object> Properties { get; }
3434

35-
public ReadOnlyCollection<IConsumer> GetConsumers();
35+
public IEnumerable<IPublisher> Publishers { get; }
36+
37+
public IEnumerable<IConsumer> Consumers { get; }
3638

3739
public long Id { get; set; }
3840
}

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public ConsumerException(string message) : base(message)
1515
}
1616
}
1717

18+
// TODO cancellation token
1819
public delegate Task MessageHandler(IContext context, IMessage message);
1920

2021
public interface IConsumer : ILifeCycle
@@ -32,14 +33,14 @@ public interface IContext
3233
/// This means the message has been processed and the broker can delete it.
3334
///
3435
/// </summary>
35-
Task AcceptAsync();
36+
void Accept();
3637

3738
///<summary>
3839
/// Discard the message (AMQP 1.0 <code>rejected</code> outcome).
3940
///This means the message cannot be processed because it is invalid, the broker can drop it
4041
/// or dead-letter it if it is configured.
4142
///</summary>
42-
Task DiscardAsync();
43+
void Discard();
4344

4445
///<summary>
4546
///Discard the message with annotations to combine with the existing message annotations.
@@ -57,15 +58,16 @@ public interface IContext
5758
///
5859
/// The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
5960
///</summary>
60-
Task DiscardAsync(Dictionary<string, object> annotations);
61+
void Discard(Dictionary<string, object> annotations);
62+
6163
///<summary>
6264
///Requeue the message (AMQP 1.0 <code>released</code> outcome).
6365
///
6466
///This means the message has not been processed and the broker can requeue it and deliver it
6567
/// to the same or a different consumer.
6668
///
6769
/// </summary>
68-
Task RequeueAsync();
70+
void Requeue();
6971

7072
///<summary>
7173
///Requeue the message with annotations to combine with the existing message annotations.
@@ -86,6 +88,6 @@ public interface IContext
8688
///
8789
///The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
8890
///</summary>
89-
Task RequeueAsync(Dictionary<string, object> annotations);
91+
void Requeue(Dictionary<string, object> annotations);
9092
}
9193
}

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 164 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public interface IConsumerBuilder
3333
/// (e.g. after a disconnection).
3434
/// </summary>
3535
/// <param name="listenerContext"> Contains the listenerContext, see <see cref="ListenerContext"/> </param>
36-
/// <returns></returns>
36+
/// <returns><see cref="IConsumerBuilder"/>The consumer builder.</returns>
3737
IConsumerBuilder SubscriptionListener(Action<ListenerContext> listenerContext);
3838

3939
IStreamOptions Stream();
@@ -42,20 +42,179 @@ public interface IConsumerBuilder
4242

4343
public interface IStreamOptions
4444
{
45+
/// <summary>The offset from which to start consuming.</summary>
46+
/// <param name="offset">the offset</param>
47+
/// <returns><see cref="IStreamOptions"/></returns>
4548
IStreamOptions Offset(long offset);
49+
50+
/// <summary>
51+
/// <para>A point in time from which to start consuming.</para>
52+
/// <para>Be aware consumers can receive messages published a bit before the specified timestamp.</para>
53+
/// </summary>
54+
/// <param name="timestamp">the timestamp</param>
55+
/// <returns><see cref="IStreamOptions"/></returns>
56+
IStreamOptions Offset(DateTime timestamp);
57+
58+
/// <summary>The offset from which to start consuming.</summary>
59+
/// <param name="specification">the offset specification</param>
60+
/// <returns><see cref="IStreamOptions"/></returns>
61+
/// <see cref="StreamOffsetSpecification"/>
4662
IStreamOptions Offset(StreamOffsetSpecification specification);
47-
IStreamOptions FilterValues(string[] values);
63+
64+
/// <summary>
65+
/// <para>The offset from which to start consuming as an interval string value.</para>
66+
/// <para>Valid units are Y, M, D, h, m, s. Examples: <code>7D</code> (7 days), <code>12h</code> (12 hours).</para>
67+
/// </summary>
68+
/// <param name="interval">the interval</param>
69+
/// <returns><see cref="IStreamOptions"/></returns>
70+
/// <see href="https://www.rabbitmq.com/docs/streams#retention">Interval Syntax</see>
71+
IStreamOptions Offset(string interval);
72+
73+
/// <summary>
74+
/// <para>Filter values for stream filtering.</para>
75+
/// <para>This a different filtering mechanism from AMQP filter expressions. Both mechanisms can be used together.</para>
76+
/// </summary>
77+
/// <param name="values">filter values</param>
78+
/// <returns><see cref="IStreamOptions"/></returns>
79+
/// <see href="https://www.rabbitmq.com/docs/streams#filtering">Stream Filtering</see>
80+
/// <see cref="Filter"/>
81+
IStreamOptions FilterValues(params string[] values);
82+
83+
/// <summary>
84+
/// <para>Whether messages without a filter value should be sent.</para>
85+
/// <para>Default is <code>false</code> (messages without a filter value are not sent).</para>
86+
/// <para>This a different filtering mechanism from AMQP filter expressions. Both mechanisms can be used together.</para>
87+
/// </summary>
88+
/// <param name="matchUnfiltered"><c>true </c>to send messages without a filter value</param>
89+
/// <returns><see cref="IStreamOptions"/></returns>
90+
/// @see #filter()
91+
/// <see cref="Filter"/>
4892
IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered);
93+
94+
/// <summary>
95+
/// <para>Options for AMQP filter expressions.</para>
96+
/// <para>Requires RabbitMQ 4.1 or more.</para>
97+
/// <para>This a different filtering mechanism from stream filtering. Both mechanisms can be used together.</para>
98+
/// </summary>
99+
/// <returns><see cref="IStreamFilterOptions"/></returns>
100+
/// <see cref="FilterValues"/>
101+
/// <see cref="FilterMatchUnfiltered(bool)"/>
102+
IStreamFilterOptions Filter();
103+
104+
/// <summary>
105+
/// Return the consumer builder.
106+
/// </summary>
107+
/// <returns><see cref="IConsumerBuilder"/></returns>
49108
IConsumerBuilder Builder();
50109
}
51110

111+
/// <summary>
112+
/// <para>Filter options for support of AMQP filter expressions.</para>
113+
/// <para>AMQP filter expressions are supported only with streams. This a different filtering mechanism from stream filtering.
114+
/// Both mechanisms can be used together.
115+
/// Requires RabbitMQ 4.1 or more.</para>
116+
/// </summary>
117+
/// <see href="https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227">AMQP Filter Expressions</see>
118+
public interface IStreamFilterOptions
119+
{
120+
/// <summary>Filter on message ID.</summary>
121+
/// <param name="id">message ID</param>
122+
/// <returns><see cref="IStreamFilterOptions"/></returns>
123+
IStreamFilterOptions MessageId(object id);
124+
125+
/// <summary>Filter on user ID.</summary>
126+
/// <param name="userId">correlation ID</param>
127+
/// <returns><see cref="IStreamFilterOptions"/></returns>
128+
IStreamFilterOptions UserId(byte[] userId);
129+
130+
/// <summary>Filter on to field.</summary>
131+
/// <param name="to">correlation ID</param>
132+
/// <returns><see cref="IStreamFilterOptions"/></returns>
133+
IStreamFilterOptions To(string to);
134+
135+
/// <summary>Filter on subject field.</summary>
136+
/// <param name="subject">correlation ID</param>
137+
/// <returns><see cref="IStreamFilterOptions"/></returns>
138+
IStreamFilterOptions Subject(string subject);
139+
140+
/// <summary>Filter on reply-to field.</summary>
141+
/// <param name="replyTo">correlation ID</param>
142+
/// <returns><see cref="IStreamFilterOptions"/></returns>
143+
IStreamFilterOptions ReplyTo(string replyTo);
144+
145+
/// <summary>Filter on correlation ID.</summary>
146+
/// <param name="correlationId">correlation ID</param>
147+
/// <returns><see cref="IStreamFilterOptions"/></returns>
148+
IStreamFilterOptions CorrelationId(object correlationId);
149+
150+
/// <summary>Filter on content-type field.</summary>
151+
/// <param name="contentType">correlation ID</param>
152+
/// <returns><see cref="IStreamFilterOptions"/></returns>
153+
IStreamFilterOptions ContentType(string contentType);
154+
155+
/// <summary>Filter on content-encoding field.</summary>
156+
/// <param name="contentEncoding">correlation ID</param>
157+
/// <returns><see cref="IStreamFilterOptions"/></returns>
158+
IStreamFilterOptions ContentEncoding(string contentEncoding);
159+
160+
/// <summary>Filter on absolute expiry time field.</summary>
161+
/// <param name="absoluteExpiryTime">correlation ID</param>
162+
/// <returns><see cref="IStreamFilterOptions"/></returns>
163+
IStreamFilterOptions AbsoluteExpiryTime(DateTime absoluteExpiryTime);
164+
165+
/// <summary>Filter on creation time field.</summary>
166+
/// <param name="creationTime">correlation ID</param>
167+
/// <returns><see cref="IStreamFilterOptions"/></returns>
168+
IStreamFilterOptions CreationTime(DateTime creationTime);
169+
170+
/// <summary>Filter on group ID.</summary>
171+
/// <param name="groupId">correlation ID</param>
172+
/// <returns><see cref="IStreamFilterOptions"/></returns>
173+
IStreamFilterOptions GroupId(string groupId);
174+
175+
/// <summary>Filter on group sequence.</summary>
176+
/// <param name="groupSequence">correlation ID</param>
177+
/// <returns><see cref="IStreamFilterOptions"/></returns>
178+
IStreamFilterOptions GroupSequence(uint groupSequence);
179+
180+
/// <summary>Filter on reply-to group.</summary>
181+
/// <param name="groupId">correlation ID</param>
182+
/// <returns><see cref="IStreamFilterOptions"/></returns>
183+
IStreamFilterOptions ReplyToGroupId(string groupId);
184+
185+
/// <summary>Filter on an application property.</summary>
186+
/// <param name="key">application property key</param>
187+
/// <param name="value">application property value</param>
188+
/// <returns><see cref="IStreamFilterOptions"/></returns>
189+
IStreamFilterOptions Property(string key, object value);
190+
191+
/// <summary>Filter on an application property as a <see cref="Amqp.Types.Symbol"/></summary>
192+
/// <param name="key">application property key</param>
193+
/// <param name="value">application property value</param>
194+
/// <returns><see cref="IStreamFilterOptions"/></returns>
195+
IStreamFilterOptions PropertySymbol(string key, string value);
196+
197+
/// <summary>
198+
/// Return the stream options.
199+
/// </summary>
200+
/// <returns><see cref="IStreamOptions"/></returns>
201+
IStreamOptions Stream();
202+
}
203+
52204
/// <summary>
53205
/// ListenerContext is a helper class that holds the contexts for the listener
54206
/// </summary>
55-
/// <param name="StreamOptions"> Stream Options that the user can change during the SubscriptionListener </param>
56-
public record ListenerContext(IStreamOptions StreamOptions)
207+
public class ListenerContext
57208
{
58-
public IStreamOptions StreamOptions { get; } = StreamOptions;
209+
private readonly IStreamOptions _streamOptions;
210+
211+
/// <param name="streamOptions"> Stream Options that the user can change during the SubscriptionListener </param>
212+
public ListenerContext(IStreamOptions streamOptions)
213+
{
214+
_streamOptions = streamOptions;
215+
}
216+
217+
public IStreamOptions StreamOptions => _streamOptions;
59218
}
60219
}
61220
}

0 commit comments

Comments
 (0)