Skip to content

Implement pre-subscription hook #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions RabbitMQ.AMQP.Client/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -24,25 +25,39 @@ public interface IConsumerBuilder

IConsumerBuilder InitialCredits(int initialCredits);

/// <summary>
/// SubscriptionListener interface callback to add behavior before a subscription is created.
/// This callback is meant for stream consumers:
/// it can be used to dynamically set the offset the consumer attaches to in the stream.
/// It is called when the consumer is first created and when the client has to re-subscribe
/// (e.g. after a disconnection).
/// </summary>
/// <param name="listenerContext"> Contains the listenerContext, see <see cref="ListenerContext"/> </param>
/// <returns></returns>
IConsumerBuilder SubscriptionListener(Action<ListenerContext> listenerContext);

IStreamOptions Stream();

Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationToken = default);


public interface IStreamOptions
{
IStreamOptions Offset(long offset);

// IStreamOptions offset(Instant timestamp);

IStreamOptions Offset(StreamOffsetSpecification specification);

IStreamOptions Offset(string interval);

IStreamOptions FilterValues(string[] values);

IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered);

IConsumerBuilder Builder();
}


/// <summary>
/// ListenerContext is a helper class that holds the contexts for the listener
/// </summary>
/// <param name="StreamOptions"> Stream Options that the user can change during the SubscriptionListener </param>
public record ListenerContext(IStreamOptions StreamOptions)
{
public IStreamOptions StreamOptions { get; } = StreamOptions;
}
}
}
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public interface IQueueSpecification : IEntityInfoSpecification<IQueueInfo>

IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes);

// TODO: Add more tests for SingleActiveConsumer
IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);

IQueueSpecification Expires(TimeSpan expiration);
Expand Down
84 changes: 46 additions & 38 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,18 @@ private enum PauseStatus
PAUSED,
}

private readonly AmqpConnection _connection;
private readonly string _address;
private readonly MessageHandler _messageHandler;
private readonly int _initialCredits;
private readonly Map _filters;
private readonly Guid _id = Guid.NewGuid();

private ReceiverLink? _receiverLink;

private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
private readonly ConsumerConfiguration _configuration;

public AmqpConsumer(AmqpConnection connection, string address,
MessageHandler messageHandler, int initialCredits, Map filters)
public AmqpConsumer(ConsumerConfiguration configuration)
{
_connection = connection;
_address = address;
_messageHandler = messageHandler;
_initialCredits = initialCredits;
_filters = filters;

if (false == _connection.Consumers.TryAdd(_id, this))
_configuration = configuration;
if (false == _configuration.Connection.Consumers.TryAdd(_id, this))
{
// TODO error?
}
Expand All @@ -52,9 +42,20 @@ public override async Task OpenAsync()
{
try
{
TaskCompletionSource<ReceiverLink> attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<ReceiverLink> attachCompletedTcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);

// this is an event to get the filters to the listener context
// it _must_ be here because in case of reconnect the original filters could be not valid anymore
// so the function must be called every time the consumer is opened normally or by reconnection
// if ListenerContext is null the function will do nothing
// ListenerContext will override only the filters the selected filters.
_configuration.ListenerContext?.Invoke(
new IConsumerBuilder.ListenerContext(new ListenerStreamOptions(_configuration.Filters)));

Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id, _filters);

Attach attach = Utils.CreateAttach(_configuration.Address, DeliveryMode.AtLeastOnce, _id,
_configuration.Filters);

void onAttached(ILink argLink, Attach argAttach)
{
Expand All @@ -74,7 +75,7 @@ void onAttached(ILink argLink, Attach argAttach)
ReceiverLink? tmpReceiverLink = null;
Task receiverLinkTask = Task.Run(async () =>
{
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
Session session = await _configuration.Connection._nativePubSubSessions.GetOrCreateSessionAsync()
.ConfigureAwait(false);
tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached);
});
Expand All @@ -89,7 +90,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
.ConfigureAwait(false);

System.Diagnostics.Debug.Assert(tmpReceiverLink != null);
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_receiverLink, tmpReceiverLink));
System.Diagnostics.Debug.Assert(object.ReferenceEquals(_receiverLink, tmpReceiverLink));

if (_receiverLink is null)
{
Expand All @@ -103,7 +104,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
}
else
{
_receiverLink.SetCredit(_initialCredits);
_receiverLink.SetCredit(_configuration.InitialCredits);

// TODO save / cancel task
_ = Task.Run(ProcessMessages);
Expand Down Expand Up @@ -150,7 +151,10 @@ private async Task ProcessMessages()

// TODO catch exceptions thrown by handlers,
// then call exception handler?
await _messageHandler(context, amqpMessage).ConfigureAwait(false);
if (_configuration.Handler != null)
{
await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
}
}
}
catch (Exception e)
Expand All @@ -173,20 +177,24 @@ public void Pause()
if (_receiverLink is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
throw new InvalidOperationException(
"_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}

if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(
ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
{
_receiverLink.SetCredit(credit: 0);

if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(
ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
{
_pauseStatus = PauseStatus.UNPAUSED;
// TODO create "internal bug" exception type?
throw new InvalidOperationException("error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
throw new InvalidOperationException(
"error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
}
else
Expand All @@ -197,24 +205,23 @@ public void Pause()

public long UnsettledMessageCount
{
get
{
return _unsettledMessageCounter.Get();
}
get { return _unsettledMessageCounter.Get(); }
}

public void Unpause()
{
if (_receiverLink is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
throw new InvalidOperationException(
"_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}

if ((int)PauseStatus.PAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.UNPAUSED, (int)PauseStatus.PAUSED))
if ((int)PauseStatus.PAUSED == Interlocked.CompareExchange(
ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.UNPAUSED, (int)PauseStatus.PAUSED))
{
_receiverLink.SetCredit(credit: _initialCredits);
_receiverLink.SetCredit(credit: _configuration.InitialCredits);
}
else
{
Expand All @@ -240,19 +247,20 @@ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
}
catch (Exception ex)
{
Trace.WriteLine(TraceLevel.Warning, "Failed to close receiver link. The consumer will be closed anyway", ex);
Trace.WriteLine(TraceLevel.Warning, "Failed to close receiver link. The consumer will be closed anyway",
ex);
}

_receiverLink = null;
OnNewStatus(State.Closed, null);
_connection.Consumers.TryRemove(_id, out _);
_configuration.Connection.Consumers.TryRemove(_id, out _);
}

public override string ToString()
{
return $"Consumer{{Address='{_address}', " +
return $"Consumer{{Address='{_configuration.Address}', " +
$"id={_id}, " +
$"Connection='{_connection}', " +
$"Connection='{_configuration.Connection}', " +
$"State='{State}'}}";
}
}
Expand Down
Loading