Skip to content

Using the TPL instead of dedicated threads. Results in a much more st… #699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 102 additions & 20 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
@@ -1,77 +1,156 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26114.2
VisualStudioVersion = 15.0.28307.1000
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "projects", "projects", "{068D7DC3-8E6E-4951-B9E3-272C641BF0DE}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "client", "client", "{ECCBAEE2-24C9-4C95-A88C-03B68E866F0F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client", "projects\client\RabbitMQ.Client\RabbitMQ.Client.csproj", "{8C554257-5ECC-45DB-873D-560BFBB74EC8}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client", "projects\client\RabbitMQ.Client\RabbitMQ.Client.csproj", "{8C554257-5ECC-45DB-873D-560BFBB74EC8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Unit", "projects\client\Unit\Unit.csproj", "{B8FAC024-CC03-4067-9FFC-02846FB8AE48}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Unit", "projects\client\Unit\Unit.csproj", "{B8FAC024-CC03-4067-9FFC-02846FB8AE48}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ApigenBootstrap", "projects\client\ApigenBootstrap\ApigenBootstrap.csproj", "{9534956B-60D5-49BA-8179-1382D4E64349}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ApigenBootstrap", "projects\client\ApigenBootstrap\ApigenBootstrap.csproj", "{9534956B-60D5-49BA-8179-1382D4E64349}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apigen", "projects\client\Apigen\Apigen.csproj", "{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apigen", "projects\client\Apigen\Apigen.csproj", "{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Unit.WinRT", "projects\client\Unit.WinRT\Unit.WinRT.csproj", "{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Unit.WinRT", "projects\client\Unit.WinRT\Unit.WinRT.csproj", "{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
DebugNoTest|Any CPU = DebugNoTest|Any CPU
Release|Any CPU = Release|Any CPU
SignedRelease|Any CPU = SignedRelease|Any CPU
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
DebugNoTest|Any CPU = DebugNoTest|Any CPU
DebugNoTest|x64 = DebugNoTest|x64
DebugNoTest|x86 = DebugNoTest|x86
Release|Any CPU = Release|Any CPU
Release|x64 = Release|x64
Release|x86 = Release|x86
SignedRelease|Any CPU = SignedRelease|Any CPU
SignedRelease|x64 = SignedRelease|x64
SignedRelease|x86 = SignedRelease|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Debug|x64.ActiveCfg = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Debug|x64.Build.0 = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Debug|x86.ActiveCfg = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Debug|x86.Build.0 = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.DebugNoTest|Any CPU.ActiveCfg = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.DebugNoTest|Any CPU.Build.0 = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.DebugNoTest|x64.ActiveCfg = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.DebugNoTest|x64.Build.0 = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.DebugNoTest|x86.ActiveCfg = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.DebugNoTest|x86.Build.0 = Debug|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Release|Any CPU.Build.0 = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.SignedRelease|Any CPU.ActiveCfg = SignedRelease|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.SignedRelease|Any CPU.Build.0 = SignedRelease|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Release|x64.ActiveCfg = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Release|x64.Build.0 = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Release|x86.ActiveCfg = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.Release|x86.Build.0 = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.SignedRelease|Any CPU.ActiveCfg = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.SignedRelease|Any CPU.Build.0 = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.SignedRelease|x64.ActiveCfg = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.SignedRelease|x64.Build.0 = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{8C554257-5ECC-45DB-873D-560BFBB74EC8}.SignedRelease|x86.Build.0 = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Debug|x64.ActiveCfg = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Debug|x64.Build.0 = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Debug|x86.ActiveCfg = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Debug|x86.Build.0 = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.DebugNoTest|Any CPU.ActiveCfg = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.DebugNoTest|x64.ActiveCfg = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.DebugNoTest|x64.Build.0 = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.DebugNoTest|x86.ActiveCfg = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.DebugNoTest|x86.Build.0 = Debug|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Release|Any CPU.Build.0 = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Release|x64.ActiveCfg = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Release|x64.Build.0 = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Release|x86.ActiveCfg = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.Release|x86.Build.0 = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.SignedRelease|Any CPU.ActiveCfg = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.SignedRelease|Any CPU.Build.0 = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.SignedRelease|x64.ActiveCfg = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.SignedRelease|x64.Build.0 = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{B8FAC024-CC03-4067-9FFC-02846FB8AE48}.SignedRelease|x86.Build.0 = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Debug|x64.ActiveCfg = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Debug|x64.Build.0 = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Debug|x86.ActiveCfg = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Debug|x86.Build.0 = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.DebugNoTest|Any CPU.ActiveCfg = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.DebugNoTest|Any CPU.Build.0 = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.DebugNoTest|x64.ActiveCfg = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.DebugNoTest|x64.Build.0 = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.DebugNoTest|x86.ActiveCfg = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.DebugNoTest|x86.Build.0 = Debug|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Release|Any CPU.Build.0 = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Release|x64.ActiveCfg = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Release|x64.Build.0 = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Release|x86.ActiveCfg = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.Release|x86.Build.0 = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.SignedRelease|Any CPU.ActiveCfg = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.SignedRelease|Any CPU.Build.0 = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.SignedRelease|x64.ActiveCfg = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.SignedRelease|x64.Build.0 = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{9534956B-60D5-49BA-8179-1382D4E64349}.SignedRelease|x86.Build.0 = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Debug|x64.ActiveCfg = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Debug|x64.Build.0 = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Debug|x86.ActiveCfg = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Debug|x86.Build.0 = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.DebugNoTest|Any CPU.ActiveCfg = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.DebugNoTest|Any CPU.Build.0 = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.DebugNoTest|x64.ActiveCfg = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.DebugNoTest|x64.Build.0 = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.DebugNoTest|x86.ActiveCfg = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.DebugNoTest|x86.Build.0 = Debug|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Release|Any CPU.Build.0 = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Release|x64.ActiveCfg = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Release|x64.Build.0 = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Release|x86.ActiveCfg = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.Release|x86.Build.0 = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|Any CPU.ActiveCfg = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|Any CPU.Build.0 = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x64.ActiveCfg = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x64.Build.0 = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x86.Build.0 = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|x64.ActiveCfg = Debug|x64
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|x64.Build.0 = Debug|x64
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|x86.ActiveCfg = Debug|x86
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|x86.Build.0 = Debug|x86
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|x64.ActiveCfg = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|x64.Build.0 = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|x86.ActiveCfg = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Debug|x86.Build.0 = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.DebugNoTest|Any CPU.ActiveCfg = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.DebugNoTest|Any CPU.Build.0 = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.DebugNoTest|x64.ActiveCfg = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.DebugNoTest|x64.Build.0 = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.DebugNoTest|x86.ActiveCfg = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.DebugNoTest|x86.Build.0 = Debug|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|Any CPU.Build.0 = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|x64.ActiveCfg = Release|x64
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|x64.Build.0 = Release|x64
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|x86.ActiveCfg = Release|x86
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|x86.Build.0 = Release|x86
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|x64.ActiveCfg = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|x64.Build.0 = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|x86.ActiveCfg = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.Release|x86.Build.0 = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.SignedRelease|Any CPU.ActiveCfg = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.SignedRelease|Any CPU.Build.0 = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.SignedRelease|x64.ActiveCfg = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.SignedRelease|x64.Build.0 = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0}.SignedRelease|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -84,4 +163,7 @@ Global
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2} = {ECCBAEE2-24C9-4C95-A88C-03B68E866F0F}
{D59E9DCD-EED6-4DFC-9FC7-11D56FF021F0} = {ECCBAEE2-24C9-4C95-A88C-03B68E866F0F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {1B4F036E-FDD6-410D-9E33-DFE38BC990E5}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ public void Quiesce()

public void Shutdown()
{
// necessary evil
this.workService.Stop().GetAwaiter().GetResult();
workService.Stop();
}

public void Shutdown(IModel model)
{
// necessary evil
this.workService.Stop(model).GetAwaiter().GetResult();
workService.Stop(model);
}

public bool IsShutdown
Expand Down Expand Up @@ -66,10 +64,10 @@ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
{
// the only case where we ignore the shutdown flag.
new ModelShutdown(consumer,reason).Execute(model).GetAwaiter().GetResult();
new ModelShutdown(consumer, reason).Execute(model).GetAwaiter().GetResult();
}

private void ScheduleUnlessShuttingDown<TWork>(TWork work)
private void ScheduleUnlessShuttingDown<TWork>(TWork work)
where TWork : Work
{
if (!this.IsShutdown)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,69 +1,55 @@
using System;
using RabbitMQ.Client.Impl;

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
internal class AsyncConsumerWorkService : ConsumerWorkService
{
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
private readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();

public void Schedule<TWork>(ModelBase model, TWork work)
where TWork : Work
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
{
// two step approach is taken, as TryGetValue does not aquire locks
// if this fails, GetOrAdd is called, which takes a lock

WorkPool workPool;
if (workPools.TryGetValue(model, out workPool) == false)
{
var newWorkPool = new WorkPool(model);
workPool = workPools.GetOrAdd(model, newWorkPool);

// start if it's only the workpool that has been just created
if (newWorkPool == workPool)
{
newWorkPool.Start();
}
}
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
}

workPool.Enqueue(work);
private WorkPool StartNewWorkPool(IModel model)
{
var newWorkPool = new WorkPool(model as ModelBase);
newWorkPool.Start();
return newWorkPool;
}

public async Task Stop(IModel model)
public void Stop(IModel model)
{
WorkPool workPool;
if (workPools.TryRemove(model, out workPool))
if (workPools.TryRemove(model, out WorkPool workPool))
{
await workPool.Stop().ConfigureAwait(false);
workPool.Stop();
}
}

public async Task Stop()
public void Stop()
{
foreach (var model in workPools.Keys)
foreach (IModel model in workPools.Keys)
{
await Stop(model).ConfigureAwait(false);
Stop(model);
}
}

class WorkPool
{
readonly ConcurrentQueue<Work> workQueue;
readonly TimeSpan waitTime;
readonly CancellationTokenSource tokenSource;
readonly ModelBase model;
TaskCompletionSource<bool> messageArrived;
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
private Task task;

public WorkPool(ModelBase model)
{
this.model = model;
workQueue = new ConcurrentQueue<Work>();
messageArrived = new TaskCompletionSource<bool>();
waitTime = TimeSpan.FromMilliseconds(100);
tokenSource = new CancellationTokenSource();
}

Expand All @@ -75,30 +61,33 @@ public void Start()
public void Enqueue(Work work)
{
workQueue.Enqueue(work);
messageArrived.TrySetResult(true);
semaphore.Release();
}

async Task Loop()
{
while (tokenSource.IsCancellationRequested == false)
{
Work work;
while (workQueue.TryDequeue(out work))
try
{
await work.Execute(model).ConfigureAwait(false);
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// Swallowing the task cancellation in case we are stopping work.
}

await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false);
messageArrived.TrySetResult(true);
messageArrived = new TaskCompletionSource<bool>();
if (!tokenSource.IsCancellationRequested && workQueue.TryDequeue(out Work work))
{
await work.Execute(model).ConfigureAwait(false);
}
}
}

public Task Stop()
public void Stop()
{
tokenSource.Cancel();
return task;
}
}
}
}
}
Loading