Skip to content

Implement AMQP filter expressions #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/cluster/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function run_docker_compose
docker compose --file "$script_dir/docker-compose.yml" $@
}

readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"

if [[ ! -v GITHUB_ACTIONS ]]
then
Expand Down
7 changes: 6 additions & 1 deletion .ci/ubuntu/one-node/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"

readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"
if [[ $3 == 'arm' ]]
then
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
else
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
fi


readonly docker_name_prefix='rabbitmq-amqp-dotnet-client'
Expand Down
4 changes: 2 additions & 2 deletions .ci/windows/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "27.0.1",
"rabbitmq": "4.0.2"
"erlang": "27.1.2",
"rabbitmq": "4.0.3"
}
8 changes: 2 additions & 6 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>

<ItemGroup>
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
<!-- HAClient -->
<PackageVersion Include="DotNext.Threading" Version="5.15.0" />
<!-- Tests -->
<PackageVersion Include="xunit" Version="2.9.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
Expand All @@ -14,7 +15,6 @@
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.0" />
<PackageVersion Include="EasyNetQ.Management.Client" Version="3.0.0" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework)=='netstandard2.0'">
<!--
Note: do NOT upgrade these dependencies unless necessary
Expand All @@ -25,19 +25,15 @@
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='net6.0'">
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='net8.0'">
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)'=='.NETFramework'">
<GlobalPackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />
</ItemGroup>

<ItemGroup Condition="'$(IsPackable)'=='true'">
<GlobalPackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" />
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
Expand Down
8 changes: 5 additions & 3 deletions RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.ObjectModel;
using System.Collections.Generic;

namespace RabbitMQ.AMQP.Client
{
Expand All @@ -30,9 +30,11 @@ public interface IConnection : ILifeCycle

IRpcClientBuilder RpcClientBuilder();

public ReadOnlyCollection<IPublisher> GetPublishers();
public IReadOnlyDictionary<string, object> Properties { get; }

public ReadOnlyCollection<IConsumer> GetConsumers();
public IEnumerable<IPublisher> Publishers { get; }

public IEnumerable<IConsumer> Consumers { get; }

public long Id { get; set; }
}
Expand Down
12 changes: 7 additions & 5 deletions RabbitMQ.AMQP.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public ConsumerException(string message) : base(message)
}
}

// TODO cancellation token
public delegate Task MessageHandler(IContext context, IMessage message);

public interface IConsumer : ILifeCycle
Expand All @@ -32,14 +33,14 @@ public interface IContext
/// This means the message has been processed and the broker can delete it.
///
/// </summary>
Task AcceptAsync();
void Accept();

///<summary>
/// Discard the message (AMQP 1.0 <code>rejected</code> outcome).
///This means the message cannot be processed because it is invalid, the broker can drop it
/// or dead-letter it if it is configured.
///</summary>
Task DiscardAsync();
void Discard();

///<summary>
///Discard the message with annotations to combine with the existing message annotations.
Expand All @@ -57,15 +58,16 @@ public interface IContext
///
/// The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
///</summary>
Task DiscardAsync(Dictionary<string, object> annotations);
void Discard(Dictionary<string, object> annotations);

///<summary>
///Requeue the message (AMQP 1.0 <code>released</code> outcome).
///
///This means the message has not been processed and the broker can requeue it and deliver it
/// to the same or a different consumer.
///
/// </summary>
Task RequeueAsync();
void Requeue();

///<summary>
///Requeue the message with annotations to combine with the existing message annotations.
Expand All @@ -86,6 +88,6 @@ public interface IContext
///
///The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
///</summary>
Task RequeueAsync(Dictionary<string, object> annotations);
void Requeue(Dictionary<string, object> annotations);
}
}
169 changes: 164 additions & 5 deletions RabbitMQ.AMQP.Client/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface IConsumerBuilder
/// (e.g. after a disconnection).
/// </summary>
/// <param name="listenerContext"> Contains the listenerContext, see <see cref="ListenerContext"/> </param>
/// <returns></returns>
/// <returns><see cref="IConsumerBuilder"/>The consumer builder.</returns>
IConsumerBuilder SubscriptionListener(Action<ListenerContext> listenerContext);

IStreamOptions Stream();
Expand All @@ -42,20 +42,179 @@ public interface IConsumerBuilder

public interface IStreamOptions
{
/// <summary>The offset from which to start consuming.</summary>
/// <param name="offset">the offset</param>
/// <returns><see cref="IStreamOptions"/></returns>
IStreamOptions Offset(long offset);

/// <summary>
/// <para>A point in time from which to start consuming.</para>
/// <para>Be aware consumers can receive messages published a bit before the specified timestamp.</para>
/// </summary>
/// <param name="timestamp">the timestamp</param>
/// <returns><see cref="IStreamOptions"/></returns>
IStreamOptions Offset(DateTime timestamp);

/// <summary>The offset from which to start consuming.</summary>
/// <param name="specification">the offset specification</param>
/// <returns><see cref="IStreamOptions"/></returns>
/// <see cref="StreamOffsetSpecification"/>
IStreamOptions Offset(StreamOffsetSpecification specification);
IStreamOptions FilterValues(string[] values);

/// <summary>
/// <para>The offset from which to start consuming as an interval string value.</para>
/// <para>Valid units are Y, M, D, h, m, s. Examples: <code>7D</code> (7 days), <code>12h</code> (12 hours).</para>
/// </summary>
/// <param name="interval">the interval</param>
/// <returns><see cref="IStreamOptions"/></returns>
/// <see href="https://www.rabbitmq.com/docs/streams#retention">Interval Syntax</see>
IStreamOptions Offset(string interval);

/// <summary>
/// <para>Filter values for stream filtering.</para>
/// <para>This a different filtering mechanism from AMQP filter expressions. Both mechanisms can be used together.</para>
/// </summary>
/// <param name="values">filter values</param>
/// <returns><see cref="IStreamOptions"/></returns>
/// <see href="https://www.rabbitmq.com/docs/streams#filtering">Stream Filtering</see>
/// <see cref="Filter"/>
IStreamOptions FilterValues(params string[] values);

/// <summary>
/// <para>Whether messages without a filter value should be sent.</para>
/// <para>Default is <code>false</code> (messages without a filter value are not sent).</para>
/// <para>This a different filtering mechanism from AMQP filter expressions. Both mechanisms can be used together.</para>
/// </summary>
/// <param name="matchUnfiltered"><c>true </c>to send messages without a filter value</param>
/// <returns><see cref="IStreamOptions"/></returns>
/// @see #filter()
/// <see cref="Filter"/>
IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered);

/// <summary>
/// <para>Options for AMQP filter expressions.</para>
/// <para>Requires RabbitMQ 4.1 or more.</para>
/// <para>This a different filtering mechanism from stream filtering. Both mechanisms can be used together.</para>
/// </summary>
/// <returns><see cref="IStreamFilterOptions"/></returns>
/// <see cref="FilterValues"/>
/// <see cref="FilterMatchUnfiltered(bool)"/>
IStreamFilterOptions Filter();

/// <summary>
/// Return the consumer builder.
/// </summary>
/// <returns><see cref="IConsumerBuilder"/></returns>
IConsumerBuilder Builder();
}

/// <summary>
/// <para>Filter options for support of AMQP filter expressions.</para>
/// <para>AMQP filter expressions are supported only with streams. This a different filtering mechanism from stream filtering.
/// Both mechanisms can be used together.
/// Requires RabbitMQ 4.1 or more.</para>
/// </summary>
/// <see href="https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227">AMQP Filter Expressions</see>
public interface IStreamFilterOptions
{
/// <summary>Filter on message ID.</summary>
/// <param name="id">message ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions MessageId(object id);

/// <summary>Filter on user ID.</summary>
/// <param name="userId">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions UserId(byte[] userId);

/// <summary>Filter on to field.</summary>
/// <param name="to">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions To(string to);

/// <summary>Filter on subject field.</summary>
/// <param name="subject">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions Subject(string subject);

/// <summary>Filter on reply-to field.</summary>
/// <param name="replyTo">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions ReplyTo(string replyTo);

/// <summary>Filter on correlation ID.</summary>
/// <param name="correlationId">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions CorrelationId(object correlationId);

/// <summary>Filter on content-type field.</summary>
/// <param name="contentType">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions ContentType(string contentType);

/// <summary>Filter on content-encoding field.</summary>
/// <param name="contentEncoding">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions ContentEncoding(string contentEncoding);

/// <summary>Filter on absolute expiry time field.</summary>
/// <param name="absoluteExpiryTime">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions AbsoluteExpiryTime(DateTime absoluteExpiryTime);

/// <summary>Filter on creation time field.</summary>
/// <param name="creationTime">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions CreationTime(DateTime creationTime);

/// <summary>Filter on group ID.</summary>
/// <param name="groupId">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions GroupId(string groupId);

/// <summary>Filter on group sequence.</summary>
/// <param name="groupSequence">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions GroupSequence(uint groupSequence);

/// <summary>Filter on reply-to group.</summary>
/// <param name="groupId">correlation ID</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions ReplyToGroupId(string groupId);

/// <summary>Filter on an application property.</summary>
/// <param name="key">application property key</param>
/// <param name="value">application property value</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions Property(string key, object value);

/// <summary>Filter on an application property as a <see cref="Amqp.Types.Symbol"/></summary>
/// <param name="key">application property key</param>
/// <param name="value">application property value</param>
/// <returns><see cref="IStreamFilterOptions"/></returns>
IStreamFilterOptions PropertySymbol(string key, string value);

/// <summary>
/// Return the stream options.
/// </summary>
/// <returns><see cref="IStreamOptions"/></returns>
IStreamOptions Stream();
}

/// <summary>
/// ListenerContext is a helper class that holds the contexts for the listener
/// </summary>
/// <param name="StreamOptions"> Stream Options that the user can change during the SubscriptionListener </param>
public record ListenerContext(IStreamOptions StreamOptions)
public class ListenerContext
{
public IStreamOptions StreamOptions { get; } = StreamOptions;
private readonly IStreamOptions _streamOptions;

/// <param name="streamOptions"> Stream Options that the user can change during the SubscriptionListener </param>
public ListenerContext(IStreamOptions streamOptions)
{
_streamOptions = streamOptions;
}

public IStreamOptions StreamOptions => _streamOptions;
}
}
}
Loading
Loading