Skip to content

Commit 0b673f0

Browse files
committed
Fix flaky test
Sometimes waiting 2 seconds is not long enough: https://ci.rabbitmq.com/teams/main/pipelines/dotnet/jobs/test/builds/240 refactor Fix WorkPool to execute all jobs in the Work queue in a loop iteration
1 parent 4577442 commit 0b673f0

File tree

3 files changed

+5
-5
lines changed

3 files changed

+5
-5
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ class WorkPool
4343
readonly ConcurrentQueue<Work> _workQueue;
4444
readonly CancellationTokenSource _tokenSource;
4545
readonly ModelBase _model;
46-
CancellationTokenRegistration _tokenRegistration;
46+
readonly CancellationTokenRegistration _tokenRegistration;
4747
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
48-
private Task _task;
4948

5049
public WorkPool(ModelBase model)
5150
{
@@ -57,7 +56,7 @@ public WorkPool(ModelBase model)
5756

5857
public void Start()
5958
{
60-
_task = Task.Run(Loop, CancellationToken.None);
59+
Task.Run(Loop, CancellationToken.None);
6160
}
6261

6362
public void Enqueue(Work work)
@@ -80,7 +79,7 @@ async Task Loop()
8079
// Swallowing the task cancellation in case we are stopping work.
8180
}
8281

83-
if (!_tokenSource.IsCancellationRequested && _workQueue.TryDequeue(out Work work))
82+
while (_tokenSource.IsCancellationRequested == false && _workQueue.TryDequeue(out Work work))
8483
{
8584
await work.Execute(_model).ConfigureAwait(false);
8685
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,9 @@ public void FinishClose()
336336
public void HandleCommand(ISession session, Command cmd)
337337
{
338338
if (!DispatchAsynchronous(cmd))// Was asynchronous. Already processed. No need to process further.
339+
{
339340
_continuationQueue.Next().HandleCommand(cmd);
341+
}
340342
}
341343

342344
public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] body)

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ public InboundFrame ReadFrame()
202202
return RabbitMQ.Client.Impl.InboundFrame.ReadFrom(_reader);
203203
}
204204

205-
private static readonly byte[] s_amqp = Encoding.ASCII.GetBytes("AMQP");
206205
public void SendHeader()
207206
{
208207
byte[] headerBytes = new byte[8];

0 commit comments

Comments
 (0)