Skip to content

Commit 677b359

Browse files
committed
* Use separate publish and consume connections.
1 parent 36e1531 commit 677b359

File tree

1 file changed

+49
-39
lines changed

1 file changed

+49
-39
lines changed

projects/Test/Integration/TestFloodPublishing.cs

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -163,29 +163,34 @@ public async Task TestMultithreadFloodPublishing()
163163
Task pub = Task.Run(async () =>
164164
{
165165
bool stop = false;
166-
using (IChannel pubCh = await _conn.CreateChannelAsync())
166+
using (IConnection publishConnection = await _connFactory.CreateConnectionAsync())
167167
{
168-
await pubCh.ConfirmSelectAsync();
169-
170-
pubCh.ChannelShutdown += (o, ea) =>
168+
using (IChannel publishChannel = await publishConnection.CreateChannelAsync())
171169
{
172-
HandleChannelShutdown(pubCh, ea, (args) =>
170+
await publishChannel.ConfirmSelectAsync();
171+
172+
publishChannel.ChannelShutdown += (o, ea) =>
173173
{
174-
if (args.Initiator == ShutdownInitiator.Peer)
174+
HandleChannelShutdown(publishChannel, ea, (args) =>
175175
{
176-
stop = true;
177-
tcs.TrySetResult(false);
178-
}
179-
});
180-
};
176+
if (args.Initiator == ShutdownInitiator.Peer)
177+
{
178+
stop = true;
179+
tcs.TrySetResult(false);
180+
}
181+
});
182+
};
183+
184+
for (int i = 0; i < publishCount && false == stop; i++)
185+
{
186+
await publishChannel.BasicPublishAsync(string.Empty, queueName, sendBody, true);
187+
}
181188

182-
for (int i = 0; i < publishCount && false == stop; i++)
183-
{
184-
await pubCh.BasicPublishAsync(string.Empty, queueName, sendBody, true);
189+
await publishChannel.WaitForConfirmsOrDieAsync();
190+
await publishChannel.CloseAsync();
185191
}
186192

187-
await pubCh.WaitForConfirmsOrDieAsync();
188-
await pubCh.CloseAsync();
193+
await publishConnection.CloseAsync();
189194
}
190195
});
191196

@@ -197,37 +202,42 @@ public async Task TestMultithreadFloodPublishing()
197202

198203
try
199204
{
200-
using (IChannel consumeCh = await _conn.CreateChannelAsync())
205+
using (IConnection consumeConnection = await _connFactory.CreateConnectionAsync())
201206
{
202-
consumeCh.ChannelShutdown += (o, ea) =>
207+
using (IChannel consumeChannel = await consumeConnection.CreateChannelAsync())
203208
{
204-
HandleChannelShutdown(consumeCh, ea, (args) =>
209+
consumeChannel.ChannelShutdown += (o, ea) =>
205210
{
206-
if (args.Initiator == ShutdownInitiator.Peer)
211+
HandleChannelShutdown(consumeChannel, ea, (args) =>
207212
{
208-
tcs.TrySetResult(false);
213+
if (args.Initiator == ShutdownInitiator.Peer)
214+
{
215+
tcs.TrySetResult(false);
216+
}
217+
});
218+
};
219+
220+
var consumer = new AsyncEventingBasicConsumer(consumeChannel);
221+
consumer.Received += async (o, a) =>
222+
{
223+
string receivedMessage = _encoding.GetString(a.Body.ToArray());
224+
Assert.Equal(message, receivedMessage);
225+
if (Interlocked.Increment(ref receivedCount) == publishCount)
226+
{
227+
tcs.SetResult(true);
209228
}
210-
});
211-
};
229+
await Task.Yield();
230+
};
212231

213-
var consumer = new AsyncEventingBasicConsumer(consumeCh);
214-
consumer.Received += async (o, a) =>
215-
{
216-
string receivedMessage = _encoding.GetString(a.Body.ToArray());
217-
Assert.Equal(message, receivedMessage);
218-
if (Interlocked.Increment(ref receivedCount) == publishCount)
219-
{
220-
tcs.SetResult(true);
221-
}
222-
await Task.Yield();
223-
};
232+
await consumeChannel.BasicConsumeAsync(queue: queueName, autoAck: true,
233+
consumerTag: string.Empty, noLocal: false, exclusive: false,
234+
arguments: null, consumer: consumer);
224235

225-
await consumeCh.BasicConsumeAsync(queue: queueName, autoAck: true,
226-
consumerTag: string.Empty, noLocal: false, exclusive: false,
227-
arguments: null, consumer: consumer);
236+
Assert.True(await tcs.Task);
237+
await consumeChannel.CloseAsync();
238+
}
228239

229-
Assert.True(await tcs.Task);
230-
await consumeCh.CloseAsync();
240+
await consumeConnection.CloseAsync();
231241
}
232242

233243
await pub;

0 commit comments

Comments
 (0)