Skip to content

Commit 38bd629

Browse files
committed
Fix #1140 by assigning the new Model to _innerChannel at the correct place.
1 parent 389819a commit 38bd629

File tree

3 files changed

+19
-9
lines changed

3 files changed

+19
-9
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,13 @@ private bool TryPerformAutomaticRecovery()
141141
// 2. Recover queues
142142
// 3. Recover bindings
143143
// 4. Recover consumers
144-
using var recoveryChannel = _innerConnection.CreateModel();
145-
RecoverExchanges(recoveryChannel);
146-
RecoverQueues(recoveryChannel);
147-
RecoverBindings(recoveryChannel);
144+
using (var recoveryChannel = _innerConnection.CreateModel())
145+
{
146+
RecoverExchanges(recoveryChannel);
147+
RecoverQueues(recoveryChannel);
148+
RecoverBindings(recoveryChannel);
149+
}
150+
148151
}
149152
RecoverModelsAndItsConsumers();
150153
}

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,19 @@ internal void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverCo
166166
newChannel.TxSelect();
167167
}
168168

169+
/*
170+
* https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1140
171+
* If this assignment is not done before recovering consumers, there is a good
172+
* chance that an invalid Model will be used to handle a basic.deliver frame,
173+
* with the resulting basic.ack never getting sent out.
174+
*/
175+
_innerChannel = newChannel;
176+
169177
if (recoverConsumers)
170178
{
171179
_connection.RecoverConsumers(this, newChannel);
172180
}
173181

174-
_innerChannel = newChannel;
175182
_innerChannel.RunRecoveryEventHandlers(this);
176183
}
177184

projects/Unit/TestConnectionRecovery.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public override void Dispose()
9090

9191
}
9292

93-
[Fact(Skip="TODO flaky")]
93+
[Fact]
9494
public void TestBasicAckAfterChannelRecovery()
9595
{
9696
var allMessagesSeenLatch = new ManualResetEventSlim(false);
@@ -112,7 +112,7 @@ public void TestBasicAckAfterChannelRecovery()
112112
Wait(allMessagesSeenLatch);
113113
}
114114

115-
[Fact(Skip="TODO flaky")]
115+
[Fact]
116116
public void TestBasicNackAfterChannelRecovery()
117117
{
118118
var allMessagesSeenLatch = new ManualResetEventSlim(false);
@@ -134,7 +134,7 @@ public void TestBasicNackAfterChannelRecovery()
134134
Wait(allMessagesSeenLatch);
135135
}
136136

137-
[Fact(Skip="TODO flaky")]
137+
[Fact]
138138
public void TestBasicRejectAfterChannelRecovery()
139139
{
140140
var allMessagesSeenLatch = new ManualResetEventSlim(false);
@@ -858,7 +858,7 @@ public void TestPublishRpcRightAfterReconnect()
858858
{
859859

860860
CloseAndWaitForRecovery();
861-
Thread.Sleep(100);
861+
Thread.Sleep(500);
862862
}
863863
finally
864864
{

0 commit comments

Comments
 (0)