Skip to content

Commit c8f0c34

Browse files
committed
Add an ad-hoc test to try and figure out what's up.
1 parent b29fd45 commit c8f0c34

File tree

2 files changed

+111
-19
lines changed

2 files changed

+111
-19
lines changed

projects/RabbitMQ.Client/client/impl/IncomingCommand.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,10 @@ public void Dispose()
2929
ArrayPool<byte>.Shared.Return(_rentedArray);
3030
}
3131
}
32+
33+
public override string ToString()
34+
{
35+
return $"IncomingCommand Method={Method.ProtocolMethodName}, Body.Length={Body.Length}";
36+
}
3237
}
3338
}

projects/Unit/TestConnectionRecovery.cs

Lines changed: 106 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,36 +44,78 @@
4444

4545
namespace RabbitMQ.Client.Unit
4646
{
47-
class DisposableConnection : IDisposable
47+
[TestFixture]
48+
public class TestConnectionRecovery : IntegrationFixture
4849
{
49-
public DisposableConnection(AutorecoveringConnection c)
50-
{
51-
Connection = c;
52-
}
50+
private readonly byte[] _messageBody;
51+
private readonly ushort _totalMessageCount = 64;
52+
private string _queueName = "luke-test-queue";
5353

54-
public AutorecoveringConnection Connection { get; private set; }
55-
56-
public void Dispose()
54+
public TestConnectionRecovery()
5755
{
58-
Connection.Close();
56+
var rnd = new Random();
57+
_messageBody = new byte[1024];
58+
rnd.NextBytes(_messageBody);
5959
}
60-
}
61-
[TestFixture]
62-
public class TestConnectionRecovery : IntegrationFixture
63-
{
60+
6461
[SetUp]
6562
public override void Init()
6663
{
6764
Conn = CreateAutorecoveringConnection();
6865
Model = Conn.CreateModel();
66+
Model.QueueDelete(_queueName);
6967
}
7068

7169
[TearDown]
7270
public void CleanUp()
7371
{
72+
Model.QueueDelete(_queueName);
73+
Model.Close();
7474
Conn.Close();
7575
}
7676

77+
[Test]
78+
public void LukeTest()
79+
{
80+
var allMessagesSeenLatch = new ManualResetEventSlim(false);
81+
var cons = new LukeAckingBasicConsumer(Model, _totalMessageCount, allMessagesSeenLatch);
82+
83+
string queueName = Model.QueueDeclare(_queueName, false, false, false, null).QueueName;
84+
Assert.AreEqual(queueName, _queueName);
85+
86+
Model.BasicQos(0, 1, false);
87+
string consumerTag = Model.BasicConsume(queueName, false, cons);
88+
89+
ManualResetEventSlim sl = PrepareForShutdown(Conn);
90+
ManualResetEventSlim rl = PrepareForRecovery(Conn);
91+
92+
using (IAutorecoveringConnection publishingConn = CreateAutorecoveringConnection())
93+
{
94+
using (IModel publishingModel = publishingConn.CreateModel())
95+
{
96+
for (ushort i = 0; i < _totalMessageCount; i++)
97+
{
98+
if (i == 16)
99+
{
100+
CloseConnection(Conn);
101+
}
102+
publishingModel.BasicPublish(string.Empty, queueName, null, _messageBody);
103+
}
104+
}
105+
}
106+
107+
// System.Console.Error.WriteLine("@@@@@@@@ waiting for shutdown...");
108+
Wait(sl);
109+
// System.Console.Error.WriteLine("@@@@@@@@ waiting for shutdown...DONE");
110+
// System.Console.Error.WriteLine("@@@@@@@@ waiting for recovery...");
111+
Wait(rl);
112+
// System.Console.Error.WriteLine("@@@@@@@@ waiting for recovery...DONE");
113+
// System.Console.Error.WriteLine("@@@@@@@@ waiting for the rest of the messages...");
114+
Wait(allMessagesSeenLatch);
115+
// System.Console.Error.WriteLine("@@@@@@@@ waiting for the rest of the messages...DONE");
116+
}
117+
118+
/*
77119
[Test]
78120
public void TestBasicAckAfterChannelRecovery()
79121
{
@@ -82,6 +124,7 @@ public void TestBasicAckAfterChannelRecovery()
82124
83125
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
84126
}
127+
*/
85128

86129
[Test]
87130
public void TestBasicAckAfterBasicGetAndChannelRecovery()
@@ -224,6 +267,7 @@ public void TestBasicModelRecoveryOnServerRestart()
224267
Assert.IsTrue(Model.IsOpen);
225268
}
226269

270+
/*
227271
[Test]
228272
public void TestBasicNackAfterChannelRecovery()
229273
{
@@ -241,6 +285,7 @@ public void TestBasicRejectAfterChannelRecovery()
241285
242286
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
243287
}
288+
*/
244289

245290
[Test]
246291
public void TestBlockedListenersRecovery()
@@ -1033,25 +1078,29 @@ internal void CloseAndWaitForRecovery(AutorecoveringConnection conn)
10331078
Wait(rl);
10341079
}
10351080

1036-
internal void CloseAndWaitForShutdown(AutorecoveringConnection conn)
1081+
internal void CloseAndWaitForShutdown(IAutorecoveringConnection conn)
10371082
{
10381083
ManualResetEventSlim sl = PrepareForShutdown(conn);
10391084
CloseConnection(conn);
10401085
Wait(sl);
10411086
}
10421087

1043-
internal ManualResetEventSlim PrepareForRecovery(AutorecoveringConnection conn)
1088+
internal ManualResetEventSlim PrepareForRecovery(IConnection conn)
10441089
{
10451090
var latch = new ManualResetEventSlim(false);
1046-
conn.RecoverySucceeded += (source, ea) => latch.Set();
1091+
1092+
IAutorecoveringConnection aconn = conn as IAutorecoveringConnection;
1093+
aconn.RecoverySucceeded += (source, ea) => latch.Set();
10471094

10481095
return latch;
10491096
}
10501097

10511098
internal ManualResetEventSlim PrepareForShutdown(IConnection conn)
10521099
{
10531100
var latch = new ManualResetEventSlim(false);
1054-
conn.ConnectionShutdown += (c, args) => latch.Set();
1101+
1102+
IAutorecoveringConnection aconn = conn as IAutorecoveringConnection;
1103+
aconn.ConnectionShutdown += (c, args) => latch.Set();
10551104

10561105
return latch;
10571106
}
@@ -1063,10 +1112,10 @@ protected override void ReleaseResources()
10631112

10641113
internal void RestartServerAndWaitForRecovery()
10651114
{
1066-
RestartServerAndWaitForRecovery((AutorecoveringConnection)Conn);
1115+
RestartServerAndWaitForRecovery((IAutorecoveringConnection)Conn);
10671116
}
10681117

1069-
internal void RestartServerAndWaitForRecovery(AutorecoveringConnection conn)
1118+
internal void RestartServerAndWaitForRecovery(IAutorecoveringConnection conn)
10701119
{
10711120
ManualResetEventSlim sl = PrepareForShutdown(conn);
10721121
ManualResetEventSlim rl = PrepareForRecovery(conn);
@@ -1075,6 +1124,7 @@ internal void RestartServerAndWaitForRecovery(AutorecoveringConnection conn)
10751124
Wait(rl);
10761125
}
10771126

1127+
/*
10781128
internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEventSlim latch)
10791129
{
10801130
string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
@@ -1096,6 +1146,7 @@ internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 con
10961146
publishingModel.Close();
10971147
publishingConn.Close();
10981148
}
1149+
*/
10991150

11001151
internal void WaitForRecovery()
11011152
{
@@ -1117,6 +1168,42 @@ internal void WaitForShutdown(IConnection conn)
11171168
Wait(PrepareForShutdown(conn));
11181169
}
11191170

1171+
public class LukeAckingBasicConsumer : DefaultBasicConsumer
1172+
{
1173+
private readonly ManualResetEventSlim _allMessagesSeenLatch;
1174+
private readonly ushort _totalMessageCount;
1175+
private ushort _counter = 0;
1176+
1177+
public LukeAckingBasicConsumer(IModel model, ushort totalMessageCount, ManualResetEventSlim allMessagesSeenLatch)
1178+
: base(model)
1179+
{
1180+
_totalMessageCount = totalMessageCount;
1181+
_allMessagesSeenLatch = allMessagesSeenLatch;
1182+
}
1183+
1184+
public override void HandleBasicDeliver(string consumerTag,
1185+
ulong deliveryTag,
1186+
bool redelivered,
1187+
string exchange,
1188+
string routingKey,
1189+
IBasicProperties properties,
1190+
ReadOnlyMemory<byte> body)
1191+
{
1192+
try
1193+
{
1194+
Model.BasicAck(deliveryTag, false);
1195+
}
1196+
finally
1197+
{
1198+
++_counter;
1199+
if (_counter >= _totalMessageCount)
1200+
{
1201+
_allMessagesSeenLatch.Set();
1202+
}
1203+
}
1204+
}
1205+
}
1206+
11201207
public class AckingBasicConsumer : TestBasicConsumer1
11211208
{
11221209
public AckingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)

0 commit comments

Comments
 (0)