Skip to content

Commit f424204

Browse files
committed
Enable concurrent uploads to the storage service (#90)
* gh-89 Enable concurrent uploads to the storage service Signed-off-by: Victor Chang <[email protected]>
1 parent 6c71467 commit f424204

File tree

6 files changed

+32
-10
lines changed

6 files changed

+32
-10
lines changed

src/Configuration/StorageConfiguration.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,16 @@ public StorageConfiguration() : this(new FileSystem())
5353
public RetryConfiguration Retries { get; set; } = new RetryConfiguration();
5454

5555
/// <summary>
56-
/// Gets or set number of threads used for payload upload. Default is 1;
56+
/// Gets or set number of payloads to be processed at a given time. Default is 1;
5757
/// </summary>
58-
public int Concurrentcy { get; set; } = 1;
58+
[ConfigurationKeyName("payloadProcessThreads")]
59+
public int PayloadProcessThreads { get; set; } = 1;
60+
61+
/// <summary>
62+
/// Gets or set the maximum number of concurrent uploads. Default is 2;
63+
/// </summary>
64+
[ConfigurationKeyName("concurrentUploads")]
65+
public int ConcurrentUploads { get; set; } = 2;
5966

6067
[JsonIgnore]
6168
public string TemporaryDataDirFullPath

src/InformaticsGateway/Logging/Log.700.PayloadService.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,8 @@ public static partial class Log
7070

7171
[LoggerMessage(EventId = 720, Level = LogLevel.Debug, Message = "Payload {id} deleted.")]
7272
public static partial void PayloadDeleted(this ILogger logger, Guid id);
73+
74+
[LoggerMessage(EventId = 721, Level = LogLevel.Debug, Message = "Upload statistics: {threads} threads, {seconds} seconds.")]
75+
public static partial void UploadStats(this ILogger logger, int threads, double seconds);
7376
}
7477
}

src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.IO;
78
using System.IO.Abstractions;
89
using System.Linq;
@@ -90,7 +91,7 @@ public Task StartAsync(CancellationToken cancellationToken)
9091
async (task) => await UploadPayloadActionBlock(task, cancellationToken).ConfigureAwait(false),
9192
new ExecutionDataflowBlockOptions
9293
{
93-
MaxDegreeOfParallelism = _options.Value.Storage.Concurrentcy,
94+
MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads,
9495
MaxMessagesPerTask = 1,
9596
CancellationToken = cancellationToken
9697
});
@@ -155,8 +156,10 @@ private void BackgroundProcessing(CancellationToken cancellationToken)
155156
private async Task UploadPayloadActionBlock(Payload payload, CancellationToken cancellationToken)
156157
{
157158
Guard.Against.Null(payload, nameof(payload));
159+
var stopwatch = new Stopwatch();
158160
try
159161
{
162+
stopwatch.Start();
160163
await Upload(payload, cancellationToken).ConfigureAwait(false);
161164

162165
if (payload.IsUploadComplete())
@@ -185,6 +188,11 @@ private async Task UploadPayloadActionBlock(Payload payload, CancellationToken c
185188
}
186189
}
187190
}
191+
finally
192+
{
193+
stopwatch.Stop();
194+
_logger.UploadStats(_options.Value.Storage.ConcurrentUploads, stopwatch.Elapsed.TotalSeconds);
195+
}
188196
}
189197

190198
private async Task Upload(Payload payload, CancellationToken cancellationToken)
@@ -193,10 +201,14 @@ private async Task Upload(Payload payload, CancellationToken cancellationToken)
193201

194202
_logger.UploadingPayloadToBucket(payload.Id, _options.Value.Storage.StorageServiceBucketName);
195203

196-
for (var index = payload.Files.Count - 1; index >= 0; index--)
204+
var options = new ParallelOptions
197205
{
198-
var file = payload.Files[index];
206+
CancellationToken = cancellationToken,
207+
MaxDegreeOfParallelism = _options.Value.Storage.ConcurrentUploads
208+
};
199209

210+
await Parallel.ForEachAsync(payload.Files, options, async (file, cancellationToke) =>
211+
{
200212
switch (file)
201213
{
202214
case DicomFileStorageInfo dicom:
@@ -206,10 +218,11 @@ private async Task Upload(Payload payload, CancellationToken cancellationToken)
206218
}
207219
break;
208220
}
221+
209222
await UploadPayloadFile(payload.Id, file.UploadFilePath, file.FilePath, file.Source, file.Workflows, file.ContentType, cancellationToken).ConfigureAwait(false);
210223
file.SetUploaded();
211224
_instanceCleanupQueue.Queue(file);
212-
}
225+
}).ConfigureAwait(false);
213226
}
214227

215228
private async Task UploadPayloadFile(Guid payloadId, string destinationPath, string sourcePath, string source, List<string> workflows, string contentType, CancellationToken cancellationToken)

src/InformaticsGateway/Services/DicomWeb/IStreamsWriter.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ private async Task SaveInstance(Stream stream, string studyInstanceUid, string w
141141
if (!string.IsNullOrWhiteSpace(studyInstanceUid) && !studyInstanceUid.Equals(uids.StudyInstanceUid, StringComparison.OrdinalIgnoreCase))
142142
{
143143
AddFailure(DicomStatus.StorageDataSetDoesNotMatchSOPClassWarning, uids);
144+
return;
144145
}
145146

146147
DicomStoragePaths storagePaths;

src/InformaticsGateway/Test/Monai.Deploy.InformaticsGateway.Test.csproj

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
2424
</PackageReference>
2525
<PackageReference Include="Microsoft.AspNetCore.Mvc.WebApiCompatShim" Version="2.2.0" />
26-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
27-
<PackageReference Include="NPOI" Version="2.5.6" />
28-
<PackageReference Include="System.IO.Abstractions.TestingHelpers" Version="16.1.25" />
2926
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="6.0.6" />
3027
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />
3128
<PackageReference Include="Moq" Version="4.18.1" />
29+
<PackageReference Include="NPOI" Version="2.5.6" />
3230
<PackageReference Include="System.IO.Abstractions.TestingHelpers" Version="17.0.18" />
3331
<PackageReference Include="xRetry" Version="1.8.0" />
3432
<PackageReference Include="xunit" Version="2.4.1" />

src/InformaticsGateway/Test/Services/DicomWeb/StreamsWriterTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public async Task Save_IgnoresInstancesWithWarning()
222222
Assert.Equal(DicomStatus.StorageDataSetDoesNotMatchSOPClassWarning.Code, warningReason);
223223
}
224224

225-
_payloadAssembler.Verify(p => p.Queue(It.Is<string>(p => p == correlationId), It.IsAny<DicomFileStorageInfo>(), It.IsAny<uint>()));
225+
_payloadAssembler.Verify(p => p.Queue(It.Is<string>(p => p == correlationId), It.IsAny<DicomFileStorageInfo>(), It.IsAny<uint>()), Times.Never());
226226
}
227227

228228
[Fact(DisplayName = "Save - queues instances with Payload Assembler")]

0 commit comments

Comments
 (0)