Skip to content

Commit 9361417

Browse files
authored
Implement pre-subscription hook (#72)
* Implement pre-subscription hook --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent c1e4f5d commit 9361417

File tree

6 files changed

+333
-96
lines changed

6 files changed

+333
-96
lines changed

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System;
56
using System.Threading;
67
using System.Threading.Tasks;
78

@@ -24,25 +25,39 @@ public interface IConsumerBuilder
2425

2526
IConsumerBuilder InitialCredits(int initialCredits);
2627

28+
/// <summary>
29+
/// SubscriptionListener interface callback to add behavior before a subscription is created.
30+
/// This callback is meant for stream consumers:
31+
/// it can be used to dynamically set the offset the consumer attaches to in the stream.
32+
/// It is called when the consumer is first created and when the client has to re-subscribe
33+
/// (e.g. after a disconnection).
34+
/// </summary>
35+
/// <param name="listenerContext"> Contains the listenerContext, see <see cref="ListenerContext"/> </param>
36+
/// <returns></returns>
37+
IConsumerBuilder SubscriptionListener(Action<ListenerContext> listenerContext);
38+
2739
IStreamOptions Stream();
2840

2941
Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationToken = default);
3042

43+
3144
public interface IStreamOptions
3245
{
3346
IStreamOptions Offset(long offset);
34-
35-
// IStreamOptions offset(Instant timestamp);
36-
3747
IStreamOptions Offset(StreamOffsetSpecification specification);
38-
39-
IStreamOptions Offset(string interval);
40-
4148
IStreamOptions FilterValues(string[] values);
42-
4349
IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered);
44-
4550
IConsumerBuilder Builder();
4651
}
52+
53+
54+
/// <summary>
55+
/// ListenerContext is a helper class that holds the contexts for the listener
56+
/// </summary>
57+
/// <param name="StreamOptions"> Stream Options that the user can change during the SubscriptionListener </param>
58+
public record ListenerContext(IStreamOptions StreamOptions)
59+
{
60+
public IStreamOptions StreamOptions { get; } = StreamOptions;
61+
}
4762
}
4863
}

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public interface IQueueSpecification : IEntityInfoSpecification<IQueueInfo>
6767

6868
IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes);
6969

70+
// TODO: Add more tests for SingleActiveConsumer
7071
IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);
7172

7273
IQueueSpecification Expires(TimeSpan expiration);

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,18 @@ private enum PauseStatus
2121
PAUSED,
2222
}
2323

24-
private readonly AmqpConnection _connection;
25-
private readonly string _address;
26-
private readonly MessageHandler _messageHandler;
27-
private readonly int _initialCredits;
28-
private readonly Map _filters;
2924
private readonly Guid _id = Guid.NewGuid();
3025

3126
private ReceiverLink? _receiverLink;
3227

3328
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
3429
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
30+
private readonly ConsumerConfiguration _configuration;
3531

36-
public AmqpConsumer(AmqpConnection connection, string address,
37-
MessageHandler messageHandler, int initialCredits, Map filters)
32+
public AmqpConsumer(ConsumerConfiguration configuration)
3833
{
39-
_connection = connection;
40-
_address = address;
41-
_messageHandler = messageHandler;
42-
_initialCredits = initialCredits;
43-
_filters = filters;
44-
45-
if (false == _connection.Consumers.TryAdd(_id, this))
34+
_configuration = configuration;
35+
if (false == _configuration.Connection.Consumers.TryAdd(_id, this))
4636
{
4737
// TODO error?
4838
}
@@ -52,9 +42,20 @@ public override async Task OpenAsync()
5242
{
5343
try
5444
{
55-
TaskCompletionSource<ReceiverLink> attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
45+
TaskCompletionSource<ReceiverLink> attachCompletedTcs =
46+
new(TaskCreationOptions.RunContinuationsAsynchronously);
47+
48+
// this is an event to get the filters to the listener context
49+
// it _must_ be here because in case of reconnect the original filters could be not valid anymore
50+
// so the function must be called every time the consumer is opened normally or by reconnection
51+
// if ListenerContext is null the function will do nothing
52+
// ListenerContext will override only the filters the selected filters.
53+
_configuration.ListenerContext?.Invoke(
54+
new IConsumerBuilder.ListenerContext(new ListenerStreamOptions(_configuration.Filters)));
5655

57-
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id, _filters);
56+
57+
Attach attach = Utils.CreateAttach(_configuration.Address, DeliveryMode.AtLeastOnce, _id,
58+
_configuration.Filters);
5859

5960
void onAttached(ILink argLink, Attach argAttach)
6061
{
@@ -74,7 +75,7 @@ void onAttached(ILink argLink, Attach argAttach)
7475
ReceiverLink? tmpReceiverLink = null;
7576
Task receiverLinkTask = Task.Run(async () =>
7677
{
77-
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
78+
Session session = await _configuration.Connection._nativePubSubSessions.GetOrCreateSessionAsync()
7879
.ConfigureAwait(false);
7980
tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached);
8081
});
@@ -89,7 +90,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
8990
.ConfigureAwait(false);
9091

9192
System.Diagnostics.Debug.Assert(tmpReceiverLink != null);
92-
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_receiverLink, tmpReceiverLink));
93+
System.Diagnostics.Debug.Assert(object.ReferenceEquals(_receiverLink, tmpReceiverLink));
9394

9495
if (_receiverLink is null)
9596
{
@@ -103,7 +104,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
103104
}
104105
else
105106
{
106-
_receiverLink.SetCredit(_initialCredits);
107+
_receiverLink.SetCredit(_configuration.InitialCredits);
107108

108109
// TODO save / cancel task
109110
_ = Task.Run(ProcessMessages);
@@ -150,7 +151,10 @@ private async Task ProcessMessages()
150151

151152
// TODO catch exceptions thrown by handlers,
152153
// then call exception handler?
153-
await _messageHandler(context, amqpMessage).ConfigureAwait(false);
154+
if (_configuration.Handler != null)
155+
{
156+
await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
157+
}
154158
}
155159
}
156160
catch (Exception e)
@@ -173,20 +177,24 @@ public void Pause()
173177
if (_receiverLink is null)
174178
{
175179
// TODO create "internal bug" exception type?
176-
throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
180+
throw new InvalidOperationException(
181+
"_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
177182
}
178183

179-
if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
180-
(int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
184+
if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(
185+
ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
186+
(int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
181187
{
182188
_receiverLink.SetCredit(credit: 0);
183189

184-
if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
185-
(int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
190+
if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(
191+
ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
192+
(int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
186193
{
187194
_pauseStatus = PauseStatus.UNPAUSED;
188195
// TODO create "internal bug" exception type?
189-
throw new InvalidOperationException("error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
196+
throw new InvalidOperationException(
197+
"error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
190198
}
191199
}
192200
else
@@ -197,24 +205,23 @@ public void Pause()
197205

198206
public long UnsettledMessageCount
199207
{
200-
get
201-
{
202-
return _unsettledMessageCounter.Get();
203-
}
208+
get { return _unsettledMessageCounter.Get(); }
204209
}
205210

206211
public void Unpause()
207212
{
208213
if (_receiverLink is null)
209214
{
210215
// TODO create "internal bug" exception type?
211-
throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
216+
throw new InvalidOperationException(
217+
"_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
212218
}
213219

214-
if ((int)PauseStatus.PAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
215-
(int)PauseStatus.UNPAUSED, (int)PauseStatus.PAUSED))
220+
if ((int)PauseStatus.PAUSED == Interlocked.CompareExchange(
221+
ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
222+
(int)PauseStatus.UNPAUSED, (int)PauseStatus.PAUSED))
216223
{
217-
_receiverLink.SetCredit(credit: _initialCredits);
224+
_receiverLink.SetCredit(credit: _configuration.InitialCredits);
218225
}
219226
else
220227
{
@@ -240,19 +247,20 @@ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
240247
}
241248
catch (Exception ex)
242249
{
243-
Trace.WriteLine(TraceLevel.Warning, "Failed to close receiver link. The consumer will be closed anyway", ex);
250+
Trace.WriteLine(TraceLevel.Warning, "Failed to close receiver link. The consumer will be closed anyway",
251+
ex);
244252
}
245253

246254
_receiverLink = null;
247255
OnNewStatus(State.Closed, null);
248-
_connection.Consumers.TryRemove(_id, out _);
256+
_configuration.Connection.Consumers.TryRemove(_id, out _);
249257
}
250258

251259
public override string ToString()
252260
{
253-
return $"Consumer{{Address='{_address}', " +
261+
return $"Consumer{{Address='{_configuration.Address}', " +
254262
$"id={_id}, " +
255-
$"Connection='{_connection}', " +
263+
$"Connection='{_configuration.Connection}', " +
256264
$"State='{State}'}}";
257265
}
258266
}

0 commit comments

Comments
 (0)