Skip to content

Commit cbe6a8c

Browse files
Merge pull request #1305 from rabbitmq/recoving-consumer-event
Port #1304 - Add event for recovering consumer
2 parents 85c53be + e4974f6 commit cbe6a8c

File tree

7 files changed

+146
-1
lines changed

7 files changed

+146
-1
lines changed

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,18 @@ public interface IConnection : INetworkConnection, IDisposable
192192
/// </remarks>
193193
event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery;
194194

195+
/// <summary>
196+
/// Raised when a consumer is about to be recovered. This event raises when topology recovery
197+
/// is enabled, and just before the consumer is recovered. This allows applications to update
198+
/// the consumer arguments before the consumer is recovered. It could be particularly useful
199+
/// when consuming from a stream queue, as it allows to update the consumer offset argument
200+
/// just before the consumer is recovered.
201+
/// </summary>
202+
/// <remarks>
203+
/// This event will never fire for connections that disable automatic recovery.
204+
/// </remarks>
205+
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;
206+
195207
event EventHandler<EventArgs> ConnectionUnblocked;
196208

197209
/// <summary>
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2023 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2023 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System.Collections.Generic;
33+
34+
namespace RabbitMQ.Client.Events
35+
{
36+
/// <summary>
37+
/// Event related to consumer recovery, during automatic recovery.
38+
/// </summary>
39+
public class RecoveringConsumerEventArgs
40+
{
41+
/// <summary>
42+
/// Constructs an event containing the consumer arguments and consumer
43+
/// tag of the consumer this event relates to.
44+
/// </summary>
45+
/// <param name="consumerTag">Consumer arguments of the consumer for this event</param>
46+
/// <param name="consumerArguments">Consumer tag of the consumer for this event</param>
47+
public RecoveringConsumerEventArgs(string consumerTag, IDictionary<string, object> consumerArguments)
48+
{
49+
ConsumerTag = consumerTag;
50+
ConsumerArguments = consumerArguments;
51+
}
52+
53+
/// <summary>
54+
/// Access the consumer arguments of the consumer this event relates to.
55+
/// </summary>
56+
public string ConsumerTag { get; }
57+
58+
/// <summary>
59+
/// Access the consumer tag of the consumer this event relates to.
60+
/// </summary>
61+
public IDictionary<string, object> ConsumerArguments { get; }
62+
}
63+
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ internal void RecoverConsumers(AutorecoveringModel channelToRecover, IModel chan
288288
continue;
289289
}
290290

291+
_consumerAboutToBeRecovered.Invoke(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments));
292+
291293
var oldTag = consumer.ConsumerTag;
292294
try
293295
{

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChang
129129
}
130130
private EventingWrapper<QueueNameChangedAfterRecoveryEventArgs> _queueNameChangeAfterRecoveryWrapper;
131131

132+
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
133+
{
134+
add => _consumerAboutToBeRecovered.AddHandler(value);
135+
remove => _consumerAboutToBeRecovered.RemoveHandler(value);
136+
}
137+
private EventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecovered;
138+
132139
public string ClientProvidedName => _config.ClientProvidedName;
133140

134141
public ushort ChannelMax => InnerConnection.ChannelMax;

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,13 @@ public event EventHandler<EventArgs> ConnectionUnblocked
148148
}
149149
private EventingWrapper<EventArgs> _connectionUnblockedWrapper;
150150

151+
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
152+
{
153+
add => _consumerAboutToBeRecovered.AddHandler(value);
154+
remove => _consumerAboutToBeRecovered.RemoveHandler(value);
155+
}
156+
private EventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecovered;
157+
151158
public event EventHandler<ShutdownEventArgs> ConnectionShutdown
152159
{
153160
add

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Benchmarks, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
1+
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Benchmarks, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
22
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Unit, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
33
namespace RabbitMQ.Client
44
{
@@ -391,6 +391,7 @@ namespace RabbitMQ.Client
391391
event System.EventHandler<System.EventArgs> ConnectionUnblocked;
392392
event System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
393393
event System.EventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery;
394+
event System.EventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs> RecoveringConsumer;
394395
event System.EventHandler<System.EventArgs> RecoverySucceeded;
395396
void Close(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort);
396397
RabbitMQ.Client.IModel CreateModel();
@@ -795,6 +796,12 @@ namespace RabbitMQ.Client.Events
795796
public string NameAfter { get; }
796797
public string NameBefore { get; }
797798
}
799+
public class RecoveringConsumerEventArgs
800+
{
801+
public RecoveringConsumerEventArgs(string consumerTag, System.Collections.Generic.IDictionary<string, object> consumerArguments) { }
802+
public System.Collections.Generic.IDictionary<string, object> ConsumerArguments { get; }
803+
public string ConsumerTag { get; }
804+
}
798805
}
799806
namespace RabbitMQ.Client.Exceptions
800807
{

projects/Unit/TestConnectionRecovery.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,53 @@ public void TestRecoveryEventHandlersOnModel()
703703
Assert.True(counter >= 3);
704704
}
705705

706+
[Theory]
707+
[InlineData(1)]
708+
[InlineData(3)]
709+
public void TestRecoveringConsumerHandlerOnConnection(int iterations)
710+
{
711+
string q = _model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
712+
var cons = new EventingBasicConsumer(_model);
713+
_model.BasicConsume(q, true, cons);
714+
715+
int counter = 0;
716+
((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) => Interlocked.Increment(ref counter);
717+
718+
for (int i = 0; i < iterations; i++)
719+
{
720+
CloseAndWaitForRecovery();
721+
}
722+
723+
Assert.Equal(iterations, counter);
724+
}
725+
726+
[Fact]
727+
public void TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePassedDown()
728+
{
729+
var myArgs = new Dictionary<string, object> { { "first-argument", "some-value" } };
730+
string q = _model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
731+
var cons = new EventingBasicConsumer(_model);
732+
string expectedCTag = _model.BasicConsume(cons, q, arguments: myArgs);
733+
734+
bool ctagMatches = false;
735+
bool consumerArgumentMatches = false;
736+
((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) =>
737+
{
738+
// We cannot assert here because NUnit throws when an assertion fails. This exception is caught and
739+
// passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick
740+
// and assert in the test function.
741+
ctagMatches = args.ConsumerTag == expectedCTag;
742+
consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value";
743+
args.ConsumerArguments["first-argument"] = "event-handler-set-this-value";
744+
};
745+
746+
CloseAndWaitForRecovery();
747+
Assert.True(ctagMatches, "expected consumer tag to match");
748+
Assert.True(consumerArgumentMatches, "expected consumer arguments to match");
749+
string actualVal = (string)Assert.Contains("first-argument", myArgs as IDictionary<string, object>);
750+
Assert.Equal("event-handler-set-this-value", actualVal);
751+
}
752+
706753
[Fact]
707754
public void TestRecoveryWithTopologyDisabled()
708755
{

0 commit comments

Comments
 (0)