Skip to content

An attempt to optimize AsyncConsumerWorkService.WorkPool dispatch loop #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RabbitMQDotNetClient.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,6 @@
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAlwaysTreatStructAsNotReorderableMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsParsFormattingSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsWrapperSettingsUpgrader/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -51,19 +50,15 @@ public async Task Stop()

class WorkPool
{
readonly ConcurrentQueue<Work> workQueue;
readonly TimeSpan waitTime;
readonly BlockingCollection<Work> workQueue;
readonly CancellationTokenSource tokenSource;
readonly ModelBase model;
TaskCompletionSource<bool> messageArrived;
private Task task;

public WorkPool(ModelBase model)
{
this.model = model;
workQueue = new ConcurrentQueue<Work>();
messageArrived = new TaskCompletionSource<bool>();
waitTime = TimeSpan.FromMilliseconds(100);
workQueue = new BlockingCollection<Work>();
tokenSource = new CancellationTokenSource();
}

Expand All @@ -74,23 +69,14 @@ public void Start()

public void Enqueue(Work work)
{
workQueue.Enqueue(work);
messageArrived.TrySetResult(true);
workQueue.Add(work);
}

async Task Loop()
{
while (tokenSource.IsCancellationRequested == false)
foreach (var work in workQueue.GetConsumingEnumerable(tokenSource.Token))
{
Work work;
while (workQueue.TryDequeue(out work))
{
await work.Execute(model).ConfigureAwait(false);
}

await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false);
messageArrived.TrySetResult(true);
messageArrived = new TaskCompletionSource<bool>();
await work.Execute(model).ConfigureAwait(false);
}
}

Expand Down