Skip to content

Commit 8058751

Browse files
committed
small improvments
Signed-off-by: Neil South <[email protected]>
1 parent 9a0a705 commit 8058751

File tree

4 files changed

+26
-19
lines changed

4 files changed

+26
-19
lines changed

src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -129,30 +129,34 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
129129
try
130130
{
131131
_timer.Enabled = false;
132-
_logger.BucketActive(_payloads.Count);
132+
if (_payloads.Any())
133+
{
134+
_logger.BucketActive(_payloads.Count);
135+
}
133136
foreach (var key in _payloads.Keys)
134137
{
135138
_logger.BucketElapsedTime(key);
136139
var payload = await _payloads[key].Task.ConfigureAwait(false);
137140
using var loggerScope = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "CorrelationId", payload.CorrelationId } });
138-
if (payload.HasTimedOut)
141+
142+
if (payload.IsUploadCompleted())
143+
{
144+
if (_payloads.TryRemove(key, out _))
145+
{
146+
await QueueBucketForNotification(key, payload).ConfigureAwait(false);
147+
}
148+
else
149+
{
150+
_logger.BucketRemoveError(key);
151+
}
152+
}
153+
else if (payload.HasTimedOut)
139154
{
140155
if (payload.ContainerUploadFailures())
141156
{
142157
_payloads.TryRemove(key, out _);
143158
_logger.PayloadRemovedWithFailureUploads(key);
144159
}
145-
else if (payload.IsUploadCompleted())
146-
{
147-
if (_payloads.TryRemove(key, out _))
148-
{
149-
await QueueBucketForNotification(key, payload).ConfigureAwait(false);
150-
}
151-
else
152-
{
153-
_logger.BucketRemoveError(key);
154-
}
155-
}
156160
}
157161
}
158162
}

src/InformaticsGateway/Services/Storage/IObjectUploadQueue.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
using System.Threading;
19+
using System.Threading.Tasks;
1920
using Monai.Deploy.InformaticsGateway.Api.Storage;
2021

2122
namespace Monai.Deploy.InformaticsGateway.Services.Storage
@@ -36,6 +37,6 @@ internal interface IObjectUploadQueue
3637
/// The default implementation blocks the call until a file is available from the queue.
3738
/// </summary>
3839
/// <param name="cancellationToken">Propagates notification that operations should be canceled.</param>
39-
FileStorageMetadata Dequeue(CancellationToken cancellationToken);
40+
Task<FileStorageMetadata> Dequeue(CancellationToken cancellationToken);
4041
}
4142
}

src/InformaticsGateway/Services/Storage/ObjectUploadQueue.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
using System;
1818
using System.Collections.Concurrent;
19-
using System.Collections.Generic;
2019
using System.Diagnostics;
2120
using System.Threading;
21+
using System.Threading.Tasks;
2222
using Ardalis.GuardClauses;
2323
using Microsoft.Extensions.Logging;
2424
using Monai.Deploy.InformaticsGateway.Api.Storage;
@@ -47,7 +47,7 @@ public void Queue(FileStorageMetadata file)
4747
_logger.InstanceAddedToUploadQueue(file.Id, _workItems.Count, process.WorkingSet64 / 1024.0);
4848
}
4949

50-
public FileStorageMetadata Dequeue(CancellationToken cancellationToken)
50+
public async Task<FileStorageMetadata> Dequeue(CancellationToken cancellationToken)
5151
{
5252
while (!cancellationToken.IsCancellationRequested)
5353
{
@@ -56,6 +56,10 @@ public FileStorageMetadata Dequeue(CancellationToken cancellationToken)
5656
_logger.InstanceInUploadQueue(_workItems.Count);
5757
return reuslt;
5858
}
59+
if (_workItems.IsEmpty)
60+
{
61+
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
62+
}
5963
}
6064
throw new OperationCanceledException("Cancellation requested.");
6165
}

src/InformaticsGateway/Services/Storage/ObjectUploadService.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
using System.Diagnostics;
2121
using System.Threading;
2222
using System.Threading.Tasks;
23-
using System.Threading.Tasks.Dataflow;
2423
using Ardalis.GuardClauses;
2524
using Microsoft.Extensions.DependencyInjection;
2625
using Microsoft.Extensions.Hosting;
@@ -32,7 +31,6 @@
3231
using Monai.Deploy.InformaticsGateway.Common;
3332
using Monai.Deploy.InformaticsGateway.Configuration;
3433
using Monai.Deploy.InformaticsGateway.Logging;
35-
using Monai.Deploy.InformaticsGateway.Repositories;
3634
using Monai.Deploy.InformaticsGateway.Services.Common;
3735
using Monai.Deploy.Storage.API;
3836
using Polly;
@@ -106,7 +104,7 @@ private async Task StartWorker(int thread, CancellationToken cancellationToken)
106104
{
107105
try
108106
{
109-
var item = _uplaodQueue.Dequeue(cancellationToken);
107+
var item = await _uplaodQueue.Dequeue(cancellationToken);
110108
await ProcessObject(item);
111109
}
112110
catch (OperationCanceledException ex)

0 commit comments

Comments
 (0)