@@ -48,19 +48,21 @@ namespace RabbitMQ.Client.Unit
48
48
public class TestConnectionRecovery : IntegrationFixture
49
49
{
50
50
private readonly byte [ ] _messageBody ;
51
- private readonly ushort _totalMessageCount = 64 ;
52
- private string _queueName = "luke-test-queue" ;
51
+ private readonly ushort _totalMessageCount = 1024 ;
52
+ private readonly ushort _closeAtCount = 16 ;
53
+ private string _queueName ;
53
54
54
55
public TestConnectionRecovery ( )
55
56
{
56
57
var rnd = new Random ( ) ;
57
- _messageBody = new byte [ 1024 ] ;
58
+ _messageBody = new byte [ 4096 ] ;
58
59
rnd . NextBytes ( _messageBody ) ;
59
60
}
60
61
61
62
[ SetUp ]
62
63
public override void Init ( )
63
64
{
65
+ _queueName = $ "TestConnectionRecovery-queue-{ Guid . NewGuid ( ) } ";
64
66
Conn = CreateAutorecoveringConnection ( ) ;
65
67
Model = Conn . CreateModel ( ) ;
66
68
Model . QueueDelete ( _queueName ) ;
@@ -73,10 +75,10 @@ public void CleanUp()
73
75
}
74
76
75
77
[ Test ]
76
- public void LukeTest ( )
78
+ public void TestBasicAckAfterChannelRecovery ( )
77
79
{
78
80
var allMessagesSeenLatch = new ManualResetEventSlim ( false ) ;
79
- var cons = new LukeAckingBasicConsumer ( Model , _totalMessageCount , allMessagesSeenLatch ) ;
81
+ var cons = new AckingBasicConsumer ( Model , _totalMessageCount , allMessagesSeenLatch ) ;
80
82
81
83
string queueName = Model . QueueDeclare ( _queueName , false , false , false , null ) . QueueName ;
82
84
Assert . AreEqual ( queueName , _queueName ) ;
@@ -87,42 +89,56 @@ public void LukeTest()
87
89
ManualResetEventSlim sl = PrepareForShutdown ( Conn ) ;
88
90
ManualResetEventSlim rl = PrepareForRecovery ( Conn ) ;
89
91
90
- using ( IAutorecoveringConnection publishingConn = CreateAutorecoveringConnection ( ) )
91
- {
92
- using ( IModel publishingModel = publishingConn . CreateModel ( ) )
93
- {
94
- for ( ushort i = 0 ; i < _totalMessageCount ; i ++ )
95
- {
96
- if ( i == 16 )
97
- {
98
- CloseConnection ( Conn ) ;
99
- }
100
- publishingModel . BasicPublish ( string . Empty , queueName , null , _messageBody ) ;
101
- }
102
- }
103
- }
92
+ PublishMessagesWhileClosingConn ( queueName ) ;
104
93
105
- // System.Console.Error.WriteLine("@@@@@@@@ waiting for shutdown...");
106
94
Wait ( sl ) ;
107
- // System.Console.Error.WriteLine("@@@@@@@@ waiting for shutdown...DONE");
108
- // System.Console.Error.WriteLine("@@@@@@@@ waiting for recovery...");
109
95
Wait ( rl ) ;
110
- // System.Console.Error.WriteLine("@@@@@@@@ waiting for recovery...DONE");
111
- // System.Console.Error.WriteLine("@@@@@@@@ waiting for the rest of the messages...");
112
96
Wait ( allMessagesSeenLatch ) ;
113
- // System.Console.Error.WriteLine("@@@@@@@@ waiting for the rest of the messages...DONE");
114
97
}
115
98
116
- /*
117
99
[ Test ]
118
- public void TestBasicAckAfterChannelRecovery ()
100
+ public void TestBasicNackAfterChannelRecovery ( )
119
101
{
120
- var latch = new ManualResetEventSlim(false);
121
- var cons = new AckingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
102
+ var allMessagesSeenLatch = new ManualResetEventSlim ( false ) ;
103
+ var cons = new NackingBasicConsumer ( Model , _totalMessageCount , allMessagesSeenLatch ) ;
104
+
105
+ string queueName = Model . QueueDeclare ( _queueName , false , false , false , null ) . QueueName ;
106
+ Assert . AreEqual ( queueName , _queueName ) ;
107
+
108
+ Model . BasicQos ( 0 , 1 , false ) ;
109
+ string consumerTag = Model . BasicConsume ( queueName , false , cons ) ;
122
110
123
- TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
111
+ ManualResetEventSlim sl = PrepareForShutdown ( Conn ) ;
112
+ ManualResetEventSlim rl = PrepareForRecovery ( Conn ) ;
113
+
114
+ PublishMessagesWhileClosingConn ( queueName ) ;
115
+
116
+ Wait ( sl ) ;
117
+ Wait ( rl ) ;
118
+ Wait ( allMessagesSeenLatch ) ;
119
+ }
120
+
121
+ [ Test ]
122
+ public void TestBasicRejectAfterChannelRecovery ( )
123
+ {
124
+ var allMessagesSeenLatch = new ManualResetEventSlim ( false ) ;
125
+ var cons = new RejectingBasicConsumer ( Model , _totalMessageCount , allMessagesSeenLatch ) ;
126
+
127
+ string queueName = Model . QueueDeclare ( _queueName , false , false , false , null ) . QueueName ;
128
+ Assert . AreEqual ( queueName , _queueName ) ;
129
+
130
+ Model . BasicQos ( 0 , 1 , false ) ;
131
+ string consumerTag = Model . BasicConsume ( queueName , false , cons ) ;
132
+
133
+ ManualResetEventSlim sl = PrepareForShutdown ( Conn ) ;
134
+ ManualResetEventSlim rl = PrepareForRecovery ( Conn ) ;
135
+
136
+ PublishMessagesWhileClosingConn ( queueName ) ;
137
+
138
+ Wait ( sl ) ;
139
+ Wait ( rl ) ;
140
+ Wait ( allMessagesSeenLatch ) ;
124
141
}
125
- */
126
142
127
143
[ Test ]
128
144
public void TestBasicAckAfterBasicGetAndChannelRecovery ( )
@@ -131,7 +147,7 @@ public void TestBasicAckAfterBasicGetAndChannelRecovery()
131
147
Model . QueueDeclare ( q , false , false , false , null ) ;
132
148
// create an offset
133
149
IBasicProperties bp = Model . CreateBasicProperties ( ) ;
134
- Model . BasicPublish ( "" , q , bp , new byte [ ] { } ) ;
150
+ Model . BasicPublish ( "" , q , bp , _messageBody ) ;
135
151
Thread . Sleep ( 50 ) ;
136
152
BasicGetResult g = Model . BasicGet ( q , false ) ;
137
153
CloseAndWaitForRecovery ( ) ;
@@ -155,8 +171,7 @@ public void TestBasicAckEventHandlerRecovery()
155
171
CloseAndWaitForRecovery ( ) ;
156
172
Assert . IsTrue ( Model . IsOpen ) ;
157
173
158
- byte [ ] mb = RandomMessageBody ( ) ;
159
- WithTemporaryNonExclusiveQueue ( Model , ( m , q ) => m . BasicPublish ( "" , q , null , mb ) ) ;
174
+ WithTemporaryNonExclusiveQueue ( Model , ( m , q ) => m . BasicPublish ( "" , q , null , _messageBody ) ) ;
160
175
Wait ( latch ) ;
161
176
}
162
177
@@ -265,26 +280,6 @@ public void TestBasicModelRecoveryOnServerRestart()
265
280
Assert . IsTrue ( Model . IsOpen ) ;
266
281
}
267
282
268
- /*
269
- [Test]
270
- public void TestBasicNackAfterChannelRecovery()
271
- {
272
- var latch = new ManualResetEventSlim(false);
273
- var cons = new NackingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
274
-
275
- TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
276
- }
277
-
278
- [Test]
279
- public void TestBasicRejectAfterChannelRecovery()
280
- {
281
- var latch = new ManualResetEventSlim(false);
282
- var cons = new RejectingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
283
-
284
- TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
285
- }
286
- */
287
-
288
283
[ Test ]
289
284
public void TestBlockedListenersRecovery ( )
290
285
{
@@ -823,7 +818,7 @@ public void TestRecoverTopologyOnDisposedChannel()
823
818
var latch = new ManualResetEventSlim ( false ) ;
824
819
cons . Received += ( s , args ) => latch . Set ( ) ;
825
820
826
- Model . BasicPublish ( "" , q , null , ReadOnlyMemory < byte > . Empty ) ;
821
+ Model . BasicPublish ( "" , q , null , _messageBody ) ;
827
822
Wait ( latch ) ;
828
823
829
824
Model . QueueUnbind ( q , x , rk ) ;
@@ -861,7 +856,7 @@ public void TestPublishRpcRightAfterReconnect()
861
856
{
862
857
try
863
858
{
864
- Model . BasicPublish ( string . Empty , testQueueName , false , properties , ReadOnlyMemory < byte > . Empty ) ;
859
+ Model . BasicPublish ( string . Empty , testQueueName , false , properties , _messageBody ) ;
865
860
}
866
861
catch ( Exception e )
867
862
{
@@ -1014,8 +1009,7 @@ internal void AssertExchangeRecovery(IModel m, string x)
1014
1009
{
1015
1010
string rk = "routing-key" ;
1016
1011
m . QueueBind ( q , x , rk ) ;
1017
- byte [ ] mb = RandomMessageBody ( ) ;
1018
- m . BasicPublish ( x , rk , null , mb ) ;
1012
+ m . BasicPublish ( x , rk , null , _messageBody ) ;
1019
1013
1020
1014
Assert . IsTrue ( WaitForConfirms ( m ) ) ;
1021
1015
m . ExchangeDeclarePassive ( x ) ;
@@ -1033,8 +1027,7 @@ internal void AssertQueueRecovery(IModel m, string q, bool exclusive)
1033
1027
m . QueueDeclarePassive ( q ) ;
1034
1028
QueueDeclareOk ok1 = m . QueueDeclare ( q , false , exclusive , false , null ) ;
1035
1029
Assert . AreEqual ( ok1 . MessageCount , 0 ) ;
1036
- byte [ ] mb = RandomMessageBody ( ) ;
1037
- m . BasicPublish ( "" , q , null , mb ) ;
1030
+ m . BasicPublish ( "" , q , null , _messageBody ) ;
1038
1031
Assert . IsTrue ( WaitForConfirms ( m ) ) ;
1039
1032
QueueDeclareOk ok2 = m . QueueDeclare ( q , false , exclusive , false , null ) ;
1040
1033
Assert . AreEqual ( ok2 . MessageCount , 1 ) ;
@@ -1122,30 +1115,6 @@ internal void RestartServerAndWaitForRecovery(IAutorecoveringConnection conn)
1122
1115
Wait ( rl ) ;
1123
1116
}
1124
1117
1125
- /*
1126
- internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEventSlim latch)
1127
- {
1128
- string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
1129
- int n = 30;
1130
- Model.BasicQos(0, 1, false);
1131
- Model.BasicConsume(q, false, cons);
1132
-
1133
- AutorecoveringConnection publishingConn = CreateAutorecoveringConnection();
1134
- IModel publishingModel = publishingConn.CreateModel();
1135
-
1136
- byte[] mb = RandomMessageBody();
1137
- for (int i = 0; i < n; i++)
1138
- {
1139
- publishingModel.BasicPublish("", q, null, mb);
1140
- }
1141
-
1142
- Wait(latch, TimeSpan.FromSeconds(20));
1143
- Model.QueueDelete(q);
1144
- publishingModel.Close();
1145
- publishingConn.Close();
1146
- }
1147
- */
1148
-
1149
1118
internal void WaitForRecovery ( )
1150
1119
{
1151
1120
Wait ( PrepareForRecovery ( ( AutorecoveringConnection ) Conn ) ) ;
@@ -1166,46 +1135,28 @@ internal void WaitForShutdown(IConnection conn)
1166
1135
Wait ( PrepareForShutdown ( conn ) ) ;
1167
1136
}
1168
1137
1169
- public class LukeAckingBasicConsumer : DefaultBasicConsumer
1138
+ internal void PublishMessagesWhileClosingConn ( string queueName )
1170
1139
{
1171
- private readonly ManualResetEventSlim _allMessagesSeenLatch ;
1172
- private readonly ushort _totalMessageCount ;
1173
- private ushort _counter = 0 ;
1174
-
1175
- public LukeAckingBasicConsumer ( IModel model , ushort totalMessageCount , ManualResetEventSlim allMessagesSeenLatch )
1176
- : base ( model )
1177
- {
1178
- _totalMessageCount = totalMessageCount ;
1179
- _allMessagesSeenLatch = allMessagesSeenLatch ;
1180
- }
1181
-
1182
- public override void HandleBasicDeliver ( string consumerTag ,
1183
- ulong deliveryTag ,
1184
- bool redelivered ,
1185
- string exchange ,
1186
- string routingKey ,
1187
- IBasicProperties properties ,
1188
- ReadOnlyMemory < byte > body )
1140
+ using ( IAutorecoveringConnection publishingConn = CreateAutorecoveringConnection ( ) )
1189
1141
{
1190
- try
1191
- {
1192
- Model . BasicAck ( deliveryTag , false ) ;
1193
- }
1194
- finally
1142
+ using ( IModel publishingModel = publishingConn . CreateModel ( ) )
1195
1143
{
1196
- ++ _counter ;
1197
- if ( _counter >= _totalMessageCount )
1144
+ for ( ushort i = 0 ; i < _totalMessageCount ; i ++ )
1198
1145
{
1199
- _allMessagesSeenLatch . Set ( ) ;
1146
+ if ( i == _closeAtCount )
1147
+ {
1148
+ CloseConnection ( Conn ) ;
1149
+ }
1150
+ publishingModel . BasicPublish ( string . Empty , queueName , null , _messageBody ) ;
1200
1151
}
1201
1152
}
1202
1153
}
1203
1154
}
1204
1155
1205
- public class AckingBasicConsumer : TestBasicConsumer1
1156
+ public class AckingBasicConsumer : TestBasicConsumer
1206
1157
{
1207
- public AckingBasicConsumer ( IModel model , ManualResetEventSlim latch , Action fn )
1208
- : base ( model , latch , fn )
1158
+ public AckingBasicConsumer ( IModel model , ushort totalMessageCount , ManualResetEventSlim allMessagesSeenLatch )
1159
+ : base ( model , totalMessageCount , allMessagesSeenLatch )
1209
1160
{
1210
1161
}
1211
1162
@@ -1215,10 +1166,10 @@ public override void PostHandleDelivery(ulong deliveryTag)
1215
1166
}
1216
1167
}
1217
1168
1218
- public class NackingBasicConsumer : TestBasicConsumer1
1169
+ public class NackingBasicConsumer : TestBasicConsumer
1219
1170
{
1220
- public NackingBasicConsumer ( IModel model , ManualResetEventSlim latch , Action fn )
1221
- : base ( model , latch , fn )
1171
+ public NackingBasicConsumer ( IModel model , ushort totalMessageCount , ManualResetEventSlim allMessagesSeenLatch )
1172
+ : base ( model , totalMessageCount , allMessagesSeenLatch )
1222
1173
{
1223
1174
}
1224
1175
@@ -1228,10 +1179,10 @@ public override void PostHandleDelivery(ulong deliveryTag)
1228
1179
}
1229
1180
}
1230
1181
1231
- public class RejectingBasicConsumer : TestBasicConsumer1
1182
+ public class RejectingBasicConsumer : TestBasicConsumer
1232
1183
{
1233
- public RejectingBasicConsumer ( IModel model , ManualResetEventSlim latch , Action fn )
1234
- : base ( model , latch , fn )
1184
+ public RejectingBasicConsumer ( IModel model , ushort totalMessageCount , ManualResetEventSlim allMessagesSeenLatch )
1185
+ : base ( model , totalMessageCount , allMessagesSeenLatch )
1235
1186
{
1236
1187
}
1237
1188
@@ -1241,16 +1192,17 @@ public override void PostHandleDelivery(ulong deliveryTag)
1241
1192
}
1242
1193
}
1243
1194
1244
- public class TestBasicConsumer1 : DefaultBasicConsumer
1195
+ public class TestBasicConsumer : DefaultBasicConsumer
1245
1196
{
1246
- private readonly Action _action ;
1247
- private readonly ManualResetEventSlim _latch ;
1197
+ private readonly ManualResetEventSlim _allMessagesSeenLatch ;
1198
+ private readonly ushort _totalMessageCount ;
1199
+ private ushort _counter = 0 ;
1248
1200
1249
- public TestBasicConsumer1 ( IModel model , ManualResetEventSlim latch , Action fn )
1201
+ public TestBasicConsumer ( IModel model , ushort totalMessageCount , ManualResetEventSlim allMessagesSeenLatch )
1250
1202
: base ( model )
1251
1203
{
1252
- _latch = latch ;
1253
- _action = fn ;
1204
+ _totalMessageCount = totalMessageCount ;
1205
+ _allMessagesSeenLatch = allMessagesSeenLatch ;
1254
1206
}
1255
1207
1256
1208
public override void HandleBasicDeliver ( string consumerTag ,
@@ -1263,23 +1215,15 @@ public override void HandleBasicDeliver(string consumerTag,
1263
1215
{
1264
1216
try
1265
1217
{
1266
- if ( deliveryTag == 5 )
1267
- {
1268
- _action ( ) ;
1269
- }
1270
-
1271
- /*
1272
- * Note: wait for one more message to be delivered before
1273
- * setting this reset event
1274
- */
1275
- if ( deliveryTag == 6 )
1276
- {
1277
- _latch . Set ( ) ;
1278
- }
1218
+ PostHandleDelivery ( deliveryTag ) ;
1279
1219
}
1280
1220
finally
1281
1221
{
1282
- PostHandleDelivery ( deliveryTag ) ;
1222
+ ++ _counter ;
1223
+ if ( _counter >= _totalMessageCount )
1224
+ {
1225
+ _allMessagesSeenLatch . Set ( ) ;
1226
+ }
1283
1227
}
1284
1228
}
1285
1229
0 commit comments