Skip to content

gh-347 Remove incomplete payloads on timeout #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/Api/MonaiApplicationEntity.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
* Copyright 2019-2021 NVIDIA Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -15,7 +15,6 @@
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
Expand Down
2 changes: 1 addition & 1 deletion src/Api/Storage/Payload.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
5 changes: 3 additions & 2 deletions src/Configuration/DicomWebConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,7 +55,8 @@ public class DicomWebConfiguration
/// single POST request, therefore, the timeout value may be insignificant unless the load of the
/// network affects the upload speed.
/// </summary>
public uint Timeout { get; set; } = 2;
[ConfigurationKeyName("timeout")]
public uint Timeout { get; set; } = 10;

public DicomWebConfiguration()
{
Expand Down
4 changes: 2 additions & 2 deletions src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static partial class Log
[LoggerMessage(EventId = 3012, Level = LogLevel.Information, Message = "Bucket {key} created with timeout {timeout}s.")]
public static partial void BucketCreated(this ILogger logger, string key, uint timeout);

[LoggerMessage(EventId = 3014, Level = LogLevel.Error, Message = "Payload deleted due to upload failure(s) {key}.")]
public static partial void PayloadRemovedWithFailureUploads(this ILogger logger, string key);
[LoggerMessage(EventId = 3014, Level = LogLevel.Error, Message = "Payload ({key}) with {totalNumberOfFiles} files deleted due to {failures} upload failure(s).")]
public static partial void PayloadRemovedWithFailureUploads(this ILogger logger, string key, int totalNumberOfFiles, int failures);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Ardalis.GuardClauses;
Expand Down Expand Up @@ -149,10 +150,10 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
_logger.BucketRemoveError(key);
}
}
else if (payload.AnyUploadFailures())
else if (payload.IsUploadCompletedWithFailures())
{
_payloads.TryRemove(key, out _);
_logger.PayloadRemovedWithFailureUploads(key);
_logger.PayloadRemovedWithFailureUploads(key, payload.Count, payload.Files.Count(p => p.IsUploadFailed));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,9 +28,9 @@ public static bool IsUploadCompleted(this Payload payload)
return payload.Files.All(p => p.IsUploaded);
}

public static bool AnyUploadFailures(this Payload payload)
public static bool IsUploadCompletedWithFailures(this Payload payload)
{
return payload.Files.Any(p => p.IsUploadFailed);
return payload.Files.Count(p => p.IsUploadFailed) + payload.Files.Count(p => p.IsUploaded) == payload.Count; ;
}

public static bool IsMoveCompleted(this Payload payload)
Expand Down
3 changes: 1 addition & 2 deletions src/InformaticsGateway/Services/Http/InferenceController.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
* Copyright 2019-2021 NVIDIA Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -90,7 +90,6 @@ public async Task<ActionResult> NewInferenceRequest([FromBody] InferenceRequest
using var _ = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "TransactionId", request.TransactionId } });
try
{

if (await _inferenceRequestRepository.ExistsAsync(request.TransactionId, HttpContext.RequestAborted).ConfigureAwait(false))
{
return Problem(title: "Conflict", statusCode: (int)HttpStatusCode.Conflict, detail: "An existing request with same transaction ID already exists.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
* Copyright 2019-2021 NVIDIA Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -103,7 +103,7 @@ private async Task StartWorker(int thread, CancellationToken cancellationToken)
{
try
{
var item = await _uplaodQueue.Dequeue(cancellationToken);
var item = await _uplaodQueue.Dequeue(cancellationToken).ConfigureAwait(false);
await ProcessObject(item).ConfigureAwait(false);
}
catch (OperationCanceledException ex)
Expand Down Expand Up @@ -164,6 +164,7 @@ private async Task ProcessObject(FileStorageMetadata blob)
}
catch (Exception ex)
{
blob.SetFailed();
_logger.FailedToUploadFile(blob.Id, ex);
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,23 @@ public async Task GivenAPayloadAssembler_WhenDisposed_ExpectResourceToBeCleanedU
}

[RetryFact(10, 200)]
public async Task GivenAPayloadThatHasNotCompleteUploads_WhenProcessedByTimedEvent_ExpectToBeAddedToQueue()
public async Task GivenAPayloadThatHasNotCompleteUploads_WhenProcessedByTimedEvent_ExpectToBeRemovedFromQueue()
{
var payloadAssembler = new PayloadAssembler(_options, _logger.Object, _serviceScopeFactory.Object);

var file = new TestStorageInfo(Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "file1", ".txt");
file.File.SetUploaded("bucket");
var file1 = new TestStorageInfo(Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "file1", ".txt");
var file2 = new TestStorageInfo(Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "file1", ".txt");

await payloadAssembler.Queue("A", file, 1);
await payloadAssembler.Queue("A", file1, 1);
await payloadAssembler.Queue("A", file2, 1);

file1.SetFailed();
file2.SetUploaded();
await Task.Delay(1001);
payloadAssembler.Dispose();

_repository.Verify(p => p.UpdateAsync(It.Is<Payload>(p => p.State == Payload.PayloadState.Move), It.IsAny<CancellationToken>()), Times.Once());
_repository.Verify(p => p.UpdateAsync(It.Is<Payload>(p => p.State == Payload.PayloadState.Move), It.IsAny<CancellationToken>()), Times.Never());
_logger.VerifyLoggingMessageBeginsWith($"Payload (A) with 2 files deleted due to 1 upload failure(s).", LogLevel.Error, Times.Once());
}

[RetryFact(10, 200)]
Expand Down
2 changes: 1 addition & 1 deletion src/InformaticsGateway/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@
"InformaticsGatewayServerEndpoint": "http://localhost:5000",
"DockerImagePrefix": "ghcr.io/project-monai/monai-deploy-informatics-gateway"
}
}
}
6 changes: 6 additions & 0 deletions src/Shared/Test/TestStorageInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Monai.Deploy.InformaticsGateway.Api.Storage;
using Monai.Deploy.Messaging;

namespace Monai.Deploy.InformaticsGateway.SharedTest;

Expand All @@ -33,4 +34,9 @@ public TestStorageInfo(string correlationsId, string identifier, string filePath
public override string DataTypeDirectoryName => "dir";

public override StorageObjectMetadata File { get; set; }

public void SetUploaded()
{
File.SetUploaded("test");
}
}
7 changes: 6 additions & 1 deletion tests/Integration.Test/Common/Hl7DataSink.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@
* limitations under the License.
*/

using System.Diagnostics;
using System.Net.Sockets;
using System.Text;
using Ardalis.GuardClauses;
Expand Down Expand Up @@ -95,6 +96,8 @@ private async Task SendOneAsync(DataProvider dataProvider, params object[] args)

private async Task SendBatchAsync(DataProvider dataProvider, params object[] args)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var messages = new List<byte>();
foreach (var file in dataProvider.HL7Specs.Files.Keys)
{
Expand Down Expand Up @@ -134,6 +137,8 @@ private async Task SendBatchAsync(DataProvider dataProvider, params object[] arg
}
} while (true);
tcpClient.Close();
stopwatch.Stop();
_outputHelper.WriteLine($"Took {stopwatch.Elapsed.TotalSeconds}s to send {messages.Count} messages.");
}
}
}
34 changes: 17 additions & 17 deletions tests/Integration.Test/Features/DicomDimseScp.feature
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 MONAI Consortium
# Copyright 2022-2023 MONAI Consortium
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,47 +37,47 @@ Feature: DICOM DIMSE SCP Services

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID
Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 3 seconds
Given a called AE Title named '<aet>' that groups by '0020,000D' for <timeout> seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over 1 associations and wait 0 between each association
And <count> <modality> studies
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER'
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET '<aet>' from 'TEST-RUNNER'
Then a successful response should be received
And <count> workflow requests sent to message broker
And studies are uploaded to storage service

Examples:
| modality | count |
| MR | 1 |
| CT | 1 |
| MG | 2 |
| US | 1 |
| modality | count | aet | timeout |
| MR | 1 | C-STORE-STUDY30 | 3 |
| CT | 1 | C-STORE-STUDY30 | 3 |
| MG | 2 | C-STORE-STUDY10 | 3 |
| US | 1 | C-STORE-STUDY10 | 3 |

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Series Instance UID
Given a called AE Title named 'C-STORE-SERIES' that groups by '0020,000E' for 3 seconds
Given a called AE Title named '<aet>' that groups by '0020,000E' for <timeout> seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over 1 associations and wait 0 between each association
And <study_count> <modality> studies with <series_count> series per study
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-SERIES' from 'TEST-RUNNER'
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET '<aet>' from 'TEST-RUNNER'
Then a successful response should be received
And <series_count> workflow requests sent to message broker
And studies are uploaded to storage service

Examples:
| modality | study_count | series_count |
| MR | 1 | 2 |
| CT | 1 | 2 |
| MG | 1 | 3 |
| US | 1 | 2 |
| modality | study_count | series_count | aet | timeout |
| MR | 1 | 2 | C-STORE-SER30 | 3 |
| CT | 1 | 2 | C-STORE-SER30 | 3 |
| MG | 1 | 3 | C-STORE-SER10 | 3 |
| US | 1 | 2 | C-STORE-SER10 | 3 |

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID over multiple associations
Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 5 seconds
Given a called AE Title named 'C-STORE-MA' that groups by '0020,000D' for 5 seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over <series_count> associations and wait <seconds> between each association
And <study_count> <modality> studies with <series_count> series per study
When C-STORE-RQ are sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER'
When C-STORE-RQ are sent to 'Informatics Gateway' with AET 'C-STORE-MA' from 'TEST-RUNNER'
Then a successful response should be received
And <workflow_requests> workflow requests sent to message broker
And studies are uploaded to storage service
Expand Down
6 changes: 3 additions & 3 deletions tests/Integration.Test/Features/DicomWebStow.feature
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 MONAI Consortium
# Copyright 2022-2023 MONAI Consortium
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,8 +39,8 @@ Feature: DICOMweb STOW-RS Service
And studies are uploaded to storage service
Examples:
| modality | count |
| CT | 2 |
| US | 1 |
| MR | 1 |
| MG | 2 |

@messaging_workflow_request @messaging
Scenario: Triggers a new workflow via DICOMWeb STOW-RS
Expand Down
4 changes: 2 additions & 2 deletions tests/Integration.Test/StepDefinitions/FhirDefinitions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,7 @@ public class FhirDefinitions
internal enum FileFormat
{ Xml, Json };

internal static readonly TimeSpan WaitTimeSpan = TimeSpan.FromMinutes(2);
internal static readonly TimeSpan WaitTimeSpan = TimeSpan.FromMinutes(3);
private readonly InformaticsGatewayConfiguration _informaticsGatewayConfiguration;
private readonly RabbitMqConsumer _receivedMessages;
private readonly DataProvider _dataProvider;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,7 +73,7 @@ public void ThenAcknowledgementAreReceived()
[Then(@"a workflow requests sent to message broker")]
public async Task ThenAWorkflowRequestIsSentToMessageBrokerAsync()
{
(await _receivedMessages.WaitforAsync(_dataProvider.HL7Specs.Files.Count, WaitTimeSpan)).Should().BeTrue();
(await _receivedMessages.WaitforAsync(1, WaitTimeSpan)).Should().BeTrue();
}

[Then(@"messages are uploaded to storage service")]
Expand Down
5 changes: 4 additions & 1 deletion tests/Integration.Test/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
"logDataPDUs": false
}
},
"dicomWeb": {
"timeout": 10
},
"messaging": {
"publisherServiceAssemblyName": "Monai.Deploy.Messaging.RabbitMQ.RabbitMQMessagePublisherService, Monai.Deploy.Messaging.RabbitMQ",
"publisherSettings": {
Expand Down Expand Up @@ -83,4 +86,4 @@
"InformaticsGatewayServerEndpoint": "http://127.0.0.1:5000",
"DockerImagePrefix": "ghcr.io/project-monai/monai-deploy-informatics-gateway"
}
}
}
4 changes: 2 additions & 2 deletions tests/Integration.Test/study.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
"CT": {
"SeriesMin": 1,
"SeriesMax": 2,
"InstanceMin": 60,
"InstanceMax": 1000,
"InstanceMin": 50,
"InstanceMax": 300,
"SizeMin": 0.5,
"SizeMax": 1
},
Expand Down