Skip to content

Commit b5d507f

Browse files
Stefán J. Sigurðarsonlukebakken
authored andcommitted
Code cleanups. Removing unneeded dependency.
1 parent ddc6b17 commit b5d507f

File tree

3 files changed

+21
-134
lines changed

3 files changed

+21
-134
lines changed

projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@
5050
</PropertyGroup>
5151

5252
<ItemGroup Condition="$('TargetFramework') == 'net462'">
53-
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
54-
</ItemGroup>
55-
5653
<ItemGroup Condition=" '$(TargetFramework)' == 'net452' ">
5754
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
5855
</ItemGroup>
Lines changed: 10 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,16 @@
11
using RabbitMQ.Client.Impl;
22

3-
using System;
43
using System.Collections.Concurrent;
54
using System.Threading;
6-
#if CORECLR
7-
using System.Threading.Channels;
8-
#endif
95
using System.Threading.Tasks;
106

117
namespace RabbitMQ.Client
128
{
139
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1410
{
15-
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
11+
private readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
1612

17-
public void Schedule<TWork>(ModelBase model, TWork work)
18-
where TWork : Work
13+
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
1914
{
2015
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
2116
}
@@ -29,78 +24,32 @@ private WorkPool StartNewWorkPool(IModel model)
2924

3025
public void Stop(IModel model)
3126
{
32-
WorkPool workPool;
33-
if (workPools.TryRemove(model, out workPool))
27+
if (workPools.TryRemove(model, out WorkPool workPool))
3428
{
3529
workPool.Stop();
3630
}
3731
}
3832

3933
public void Stop()
4034
{
41-
foreach (var model in workPools.Keys)
35+
foreach (IModel model in workPools.Keys)
4236
{
4337
Stop(model);
4438
}
4539
}
4640

47-
#if CORECLR
48-
class WorkPool
49-
{
50-
readonly Channel<Work> actionChannel = Channel.CreateUnbounded<Work>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
51-
readonly ModelBase model;
52-
Task worker;
53-
54-
public WorkPool(ModelBase model)
55-
{
56-
this.model = model;
57-
}
58-
59-
public void Start()
60-
{
61-
worker = Task.Run(Loop);
62-
}
63-
64-
public void Enqueue(Work action)
65-
{
66-
actionChannel.Writer.TryWrite(action);
67-
}
68-
69-
async Task Loop()
70-
{
71-
while (await actionChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
72-
{
73-
while (actionChannel.Reader.TryRead(out Work action))
74-
{
75-
await action.Execute(model).ConfigureAwait(false);
76-
}
77-
}
78-
}
79-
80-
public void Stop()
81-
{
82-
actionChannel.Writer.Complete();
83-
worker.Wait();
84-
}
85-
}
86-
}
87-
}
88-
#else
8941
class WorkPool
9042
{
9143
readonly ConcurrentQueue<Work> workQueue;
92-
readonly TimeSpan waitTime;
9344
readonly CancellationTokenSource tokenSource;
9445
readonly ModelBase model;
95-
TaskCompletionSource<bool> messageArrived;
46+
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
9647
private Task task;
9748

9849
public WorkPool(ModelBase model)
9950
{
10051
this.model = model;
10152
workQueue = new ConcurrentQueue<Work>();
102-
messageArrived = new TaskCompletionSource<bool>();
103-
waitTime = TimeSpan.FromMilliseconds(100);
10453
tokenSource = new CancellationTokenSource();
10554
}
10655

@@ -112,30 +61,26 @@ public void Start()
11261
public void Enqueue(Work work)
11362
{
11463
workQueue.Enqueue(work);
115-
messageArrived.TrySetResult(true);
64+
semaphore.Release();
11665
}
11766

11867
async Task Loop()
11968
{
12069
while (tokenSource.IsCancellationRequested == false)
12170
{
122-
while (workQueue.TryDequeue(out Work work))
71+
await semaphore.WaitAsync().ConfigureAwait(false);
72+
if (workQueue.TryDequeue(out Work work))
12373
{
12474
await work.Execute(model).ConfigureAwait(false);
12575
}
126-
127-
await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false);
128-
messageArrived.TrySetResult(true);
129-
messageArrived = new TaskCompletionSource<bool>();
13076
}
13177
}
13278

133-
public Task Stop()
79+
public void Stop()
13480
{
13581
tokenSource.Cancel();
136-
return task;
82+
task.Wait();
13783
}
13884
}
13985
}
14086
}
141-
#endif
Lines changed: 11 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,16 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Threading;
4-
#if CORECLR
5-
using System.Threading.Channels;
6-
#endif
74
using System.Threading.Tasks;
85

96
namespace RabbitMQ.Client
107
{
118
public class ConsumerWorkService
129
{
13-
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
10+
private readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
1411

1512
public void AddWork(IModel model, Action fn)
1613
{
17-
// two step approach is taken, as TryGetValue does not aquire locks
18-
// if this fails, GetOrAdd is called, which takes a lock
1914
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(fn);
2015
}
2116

@@ -28,80 +23,30 @@ private WorkPool StartNewWorkPool(IModel model)
2823

2924
public void StopWork(IModel model)
3025
{
31-
WorkPool workPool;
32-
if (workPools.TryRemove(model, out workPool))
26+
if (workPools.TryRemove(model, out WorkPool workPool))
3327
{
3428
workPool.Stop();
3529
}
3630
}
3731

3832
public void StopWork()
3933
{
40-
foreach (var model in workPools.Keys)
34+
foreach (IModel model in workPools.Keys)
4135
{
4236
StopWork(model);
4337
}
4438
}
4539

46-
#if CORECLR
47-
class WorkPool
48-
{
49-
readonly Channel<Action> actionChannel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
50-
Task worker;
51-
52-
public WorkPool(IModel model)
53-
{
54-
}
55-
56-
public void Start()
57-
{
58-
worker = Task.Run(Loop);
59-
}
60-
61-
public void Enqueue(Action action)
62-
{
63-
actionChannel.Writer.TryWrite(action);
64-
}
65-
66-
async Task Loop()
67-
{
68-
while (await actionChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
69-
{
70-
while (actionChannel.Reader.TryRead(out Action action))
71-
{
72-
try
73-
{
74-
action();
75-
}
76-
catch (Exception)
77-
{
78-
}
79-
}
80-
}
81-
}
82-
83-
public void Stop()
84-
{
85-
actionChannel.Writer.Complete();
86-
worker.Wait();
87-
}
88-
}
89-
}
90-
}
91-
#else
9240
class WorkPool
9341
{
9442
readonly ConcurrentQueue<Action> actions;
95-
readonly AutoResetEvent messageArrived;
96-
readonly TimeSpan waitTime;
43+
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
9744
readonly CancellationTokenSource tokenSource;
98-
Task worker;
45+
private Task worker;
9946

10047
public WorkPool(IModel model)
10148
{
10249
actions = new ConcurrentQueue<Action>();
103-
messageArrived = new AutoResetEvent(false);
104-
waitTime = TimeSpan.FromMilliseconds(100);
10550
tokenSource = new CancellationTokenSource();
10651
}
10752

@@ -113,14 +58,16 @@ public void Start()
11358
public void Enqueue(Action action)
11459
{
11560
actions.Enqueue(action);
116-
messageArrived.Set();
61+
semaphore.Release();
11762
}
11863

119-
void Loop()
64+
async Task Loop()
12065
{
12166
while (tokenSource.IsCancellationRequested == false)
12267
{
123-
while (actions.TryDequeue(out Action action))
68+
await semaphore.WaitAsync().ConfigureAwait(false);
69+
70+
if (actions.TryDequeue(out Action action))
12471
{
12572
try
12673
{
@@ -130,16 +77,14 @@ void Loop()
13077
{
13178
}
13279
}
133-
134-
messageArrived.WaitOne(waitTime);
13580
}
13681
}
13782

13883
public void Stop()
13984
{
14085
tokenSource.Cancel();
86+
worker.Wait();
14187
}
14288
}
14389
}
14490
}
145-
#endif

0 commit comments

Comments
 (0)