Skip to content

Commit 0579247

Browse files
Merge pull request #1304 from rabbitmq/recovering-consumer-6.x
Add event for recovering consumer
2 parents afe911c + 93cef57 commit 0579247

File tree

5 files changed

+134
-0
lines changed

5 files changed

+134
-0
lines changed

projects/RabbitMQ.Client/client/api/IAutorecoveringConnection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,6 @@ public interface IAutorecoveringConnection : IConnection
4545

4646
event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
4747
event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery;
48+
event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;
4849
}
4950
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2023 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2023 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Collections.Generic;
34+
35+
namespace RabbitMQ.Client.Events
36+
{
37+
/// <summary>
38+
/// Event related to consumer recovery, during automatic recovery.
39+
/// </summary>
40+
public class RecoveringConsumerEventArgs : EventArgs
41+
{
42+
/// <summary>
43+
/// Constructs an event containing the consumer arguments and consumer
44+
/// tag of the consumer this event relates to.
45+
/// </summary>
46+
/// <param name="consumerArguments">Consumer arguments of the consumer for this event</param>
47+
/// <param name="consumerTag">Consumer tag of the consumer for this event</param>
48+
public RecoveringConsumerEventArgs(IDictionary<string, object> consumerArguments, string consumerTag)
49+
{
50+
ConsumerArguments = consumerArguments;
51+
ConsumerTag = consumerTag;
52+
}
53+
54+
/// <summary>
55+
/// Access the consumer arguments of the consumer this event relates to.
56+
/// </summary>
57+
public IDictionary<string, object> ConsumerArguments { get; }
58+
59+
/// <summary>
60+
/// Access the consumer tag of the consumer this event relates to.
61+
/// </summary>
62+
public string ConsumerTag { get; }
63+
}
64+
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ public event EventHandler<EventArgs> ConnectionUnblocked
198198
}
199199
}
200200

201+
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;
201202
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
202203
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery;
203204

@@ -1099,6 +1100,20 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe
10991100
string tag = pair.Key;
11001101
try
11011102
{
1103+
foreach (EventHandler<RecoveringConsumerEventArgs> eh in RecoveringConsumer?.GetInvocationList() ?? Array.Empty<Delegate>())
1104+
{
1105+
try
1106+
{
1107+
var eventArgs = new RecoveringConsumerEventArgs(cons.Arguments, cons.ConsumerTag);
1108+
eh(this, eventArgs);
1109+
}
1110+
catch (Exception e)
1111+
{
1112+
var args = new CallbackExceptionEventArgs(e) { Detail = { ["context"] = "OnBeforeRecoveringConsumer" } };
1113+
_delegate.OnCallbackException(args);
1114+
}
1115+
}
1116+
11021117
string newTag = cons.Recover(channelToUse);
11031118
lock (_recordedConsumers)
11041119
{

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ namespace RabbitMQ.Client
243243
event System.EventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;
244244
event System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
245245
event System.EventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery;
246+
event System.EventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs> RecoveringConsumer;
246247
event System.EventHandler<System.EventArgs> RecoverySucceeded;
247248
}
248249
public interface IBasicConsumer
@@ -689,6 +690,12 @@ namespace RabbitMQ.Client.Events
689690
public string NameAfter { get; }
690691
public string NameBefore { get; }
691692
}
693+
public class RecoveringConsumerEventArgs : System.EventArgs
694+
{
695+
public RecoveringConsumerEventArgs(System.Collections.Generic.IDictionary<string, object> consumerArguments, string consumerTag) { }
696+
public System.Collections.Generic.IDictionary<string, object> ConsumerArguments { get; }
697+
public string ConsumerTag { get; }
698+
}
692699
public class RecoveryExceptionEventArgs : RabbitMQ.Client.Events.BaseExceptionEventArgs
693700
{
694701
public RecoveryExceptionEventArgs(System.Exception e) { }

projects/Unit/TestConnectionRecovery.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections;
3334
using System.Collections.Generic;
3435
using System.Threading;
3536

@@ -688,6 +689,52 @@ public void TestRecoveryEventHandlersOnConnection()
688689
Assert.IsTrue(counter >= 3);
689690
}
690691

692+
[Test]
693+
public void TestRecoveringConsumerHandlerOnConnection()
694+
{
695+
string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
696+
var cons = new EventingBasicConsumer(Model);
697+
Model.BasicConsume(q, true, cons);
698+
699+
int counter = 0;
700+
((AutorecoveringConnection)Conn).RecoveringConsumer += (sender, args) => Interlocked.Increment(ref counter);
701+
702+
CloseAndWaitForRecovery();
703+
Assert.IsTrue(Conn.IsOpen, "expected connection to be open");
704+
Assert.AreEqual(1, counter);
705+
706+
CloseAndWaitForRecovery();
707+
CloseAndWaitForRecovery();
708+
Assert.IsTrue(Conn.IsOpen, "expected connection to be open");
709+
Assert.AreEqual(3, counter);
710+
}
711+
712+
[Test]
713+
public void TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePassedDown()
714+
{
715+
var myArgs = new Dictionary<string, object> { { "first-argument", "some-value" } };
716+
string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
717+
var cons = new EventingBasicConsumer(Model);
718+
string expectedCTag = Model.BasicConsume(cons, q, arguments: myArgs);
719+
720+
bool ctagMatches = false;
721+
bool consumerArgumentMatches = false;
722+
((AutorecoveringConnection)Conn).RecoveringConsumer += (sender, args) =>
723+
{
724+
// We cannot assert here because NUnit throws when an assertion fails. This exception is caught and
725+
// passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick
726+
// and assert in the test function.
727+
ctagMatches = args.ConsumerTag == expectedCTag;
728+
consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value";
729+
args.ConsumerArguments["first-argument"] = "event-handler-set-this-value";
730+
};
731+
732+
CloseAndWaitForRecovery();
733+
Assert.That(ctagMatches, Is.True, "expected consumer tag to match");
734+
Assert.That(consumerArgumentMatches, Is.True, "expected consumer arguments to match");
735+
Assert.That(myArgs, Does.ContainKey("first-argument").WithValue("event-handler-set-this-value"));
736+
}
737+
691738
[Test]
692739
public void TestRecoveryEventHandlersOnModel()
693740
{

0 commit comments

Comments
 (0)