Skip to content

remove the need for a double copy to minio #395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Api/Storage/FileStorageMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,7 @@ public virtual void SetFailed()
{
File.SetFailed();
}

public string? PayloadId { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/Configuration/ConfigurationValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private bool IsValidDirectory(string source, string directory)
private bool IsValidBucketName(string source, string bucketName)
{
var valid = IsNotNullOrWhiteSpace(source, bucketName);
var regex = new Regex("(?=^.{3,63}$)(^[a-z0-9]+[a-z0-9\\-]+[a-z0-9]+$)");
var regex = new Regex("(?=^.{3,63}$)(^[a-z0-9]+[a-z0-9\\-]+[a-z0-9]+$)", new RegexOptions(), TimeSpan.FromSeconds(5));
if (!regex.IsMatch(bucketName))
{
valid = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public async Task GivenAAETitleName_WhenFindByAETAsyncIsCalled_ExpectItToReturnM
Assert.Equal("AET1", actual.FirstOrDefault()!.Name);

actual = await store.FindByAETAsync("AET6").ConfigureAwait(false);
Assert.NotNull(actual);
Assert.Empty(actual);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public async Task GivenAETitle_WhenFindByAETitleAsyncIsCalled_ExpectItToReturnMa
Assert.Equal("AET1", actual.FirstOrDefault()!.Name);

actual = await store.FindByAETAsync("AET6").ConfigureAwait(false);
Assert.NotNull(actual);
Assert.Empty(actual);
}

Expand Down
10 changes: 5 additions & 5 deletions src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ public static partial class Log
[LoggerMessage(EventId = 4001, Level = LogLevel.Debug, Message = "Upload statistics: {threads} threads, {seconds} seconds.")]
public static partial void UploadStats(this ILogger logger, int threads, double seconds);

[LoggerMessage(EventId = 4002, Level = LogLevel.Debug, Message = "Uploading file to temporary store at {filePath}.")]
public static partial void UploadingFileToTemporaryStore(this ILogger logger, string filePath);
[LoggerMessage(EventId = 4002, Level = LogLevel.Debug, Message = "Uploading file to storeage at {filePath}.")]
public static partial void UploadingFileToStoreage(this ILogger logger, string filePath);

[LoggerMessage(EventId = 4003, Level = LogLevel.Information, Message = "Instance queued for upload {identifier}. Items in queue {count} using memory {memoryUsageKb}KB.")]
public static partial void InstanceAddedToUploadQueue(this ILogger logger, string identifier, int count, double memoryUsageKb);

[LoggerMessage(EventId = 4004, Level = LogLevel.Debug, Message = "Error removing objects that are pending upload during startup.")]
public static partial void ErrorRemovingPendingUploadObjects(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 4005, Level = LogLevel.Error, Message = "Error uploading temporary store. Waiting {timeSpan} before next retry. Retry attempt {retryCount}.")]
[LoggerMessage(EventId = 4005, Level = LogLevel.Error, Message = "Error uploading storeage. Waiting {timeSpan} before next retry. Retry attempt {retryCount}.")]
public static partial void ErrorUploadingFileToTemporaryStore(this ILogger logger, TimeSpan timespan, int retryCount, Exception ex);

[LoggerMessage(EventId = 4006, Level = LogLevel.Information, Message = "File uploaded to temporary store at {filePath}.")]
public static partial void UploadedFileToTemporaryStore(this ILogger logger, string filePath);
[LoggerMessage(EventId = 4006, Level = LogLevel.Information, Message = "File uploaded to storeage at {filePath}.")]
public static partial void UploadedFileToStoreage(this ILogger logger, string filePath);

[LoggerMessage(EventId = 4007, Level = LogLevel.Debug, Message = "Items in queue {count}.")]
public static partial void InstanceInUploadQueue(this ILogger logger, int count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,11 @@ private async Task NotifyNewInstance(InferenceRequest inferenceRequest, Dictiona
{
retrievedFiles[key].SetWorkflows(inferenceRequest.Application.Id);
}
var FileMeta = retrievedFiles[key];

var payloadId = await _payloadAssembler.Queue(inferenceRequest.TransactionId, retrievedFiles[key]).ConfigureAwait(false);
retrievedFiles[key].PayloadId = payloadId.ToString();
_uploadQueue.Queue(retrievedFiles[key]);
await _payloadAssembler.Queue(inferenceRequest.TransactionId, retrievedFiles[key]).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using Monai.Deploy.InformaticsGateway.Api.Storage;
Expand All @@ -30,15 +31,15 @@ internal interface IPayloadAssembler
/// </summary>
/// <param name="bucket">The bucket group the file belongs to.</param>
/// <param name="file">Path to the file to be added to the payload bucket.</param>
Task Queue(string bucket, FileStorageMetadata file);
Task<Guid> Queue(string bucket, FileStorageMetadata file);

/// <summary>
/// Queue a new file for the spcified payload bucket.
/// </summary>
/// <param name="bucket">The bucket group the file belongs to.</param>
/// <param name="file">Path to the file to be added to the payload bucket.</param>
/// <param name="timeout">Number of seconds to wait for additional files.</param>
Task Queue(string bucket, FileStorageMetadata file, uint timeout);
Task<Guid> Queue(string bucket, FileStorageMetadata file, uint timeout);

/// <summary>
/// Dequeue a payload from the queue for the message broker to notify subscribers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ private async Task RemovePendingPayloads()
/// </summary>
/// <param name="bucket">Name of the bucket where the file would be added to</param>
/// <param name="file">Instance to be queued</param>
public async Task Queue(string bucket, FileStorageMetadata file) => await Queue(bucket, file, DEFAULT_TIMEOUT).ConfigureAwait(false);
public async Task<Guid> Queue(string bucket, FileStorageMetadata file) => await Queue(bucket, file, DEFAULT_TIMEOUT).ConfigureAwait(false);

/// <summary>
/// Queues a new instance of <see cref="FileStorageMetadata"/>.
/// </summary>
/// <param name="bucket">Name of the bucket where the file would be added to</param>
/// <param name="file">Instance to be queued</param>
/// <param name="timeout">Number of seconds the bucket shall wait before sending the payload to be processed. Note: timeout cannot be modified once the bucket is created.</param>
public async Task Queue(string bucket, FileStorageMetadata file, uint timeout)
public async Task<Guid> Queue(string bucket, FileStorageMetadata file, uint timeout)
{
Guard.Against.Null(file);

Expand All @@ -106,6 +106,7 @@ public async Task Queue(string bucket, FileStorageMetadata file, uint timeout)
var payload = await CreateOrGetPayload(bucket, file.CorrelationId, timeout).ConfigureAwait(false);
payload.Add(file);
_logger.FileAddedToBucket(payload.Key, payload.Count);
return payload.PayloadId;
}

/// <summary>
Expand Down
144 changes: 0 additions & 144 deletions src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public async Task MoveFilesAsync(Payload payload, ActionBlock<Payload> moveQueue
var stopwatch = Stopwatch.StartNew();
try
{
await Move(payload, cancellationToken).ConfigureAwait(false);
await NotifyIfCompleted(payload, notificationQueue, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down Expand Up @@ -127,149 +126,6 @@ private async Task NotifyIfCompleted(Payload payload, ActionBlock<Payload> notif
}
}

private async Task Move(Payload payload, CancellationToken cancellationToken)
{
Guard.Against.Null(payload);

_logger.MovingFIlesInPayload(payload.PayloadId, _options.Value.Storage.StorageServiceBucketName);

var options = new ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = _options.Value.Storage.ConcurrentUploads
};

var exceptions = new List<Exception>();
await Parallel.ForEachAsync(payload.Files, options, async (file, cancellationToke) =>
{
try
{
switch (file)
{
case DicomFileStorageMetadata dicom:
if (!string.IsNullOrWhiteSpace(dicom.JsonFile.TemporaryPath))
{
await MoveFile(payload.PayloadId, dicom.Id, dicom.JsonFile, cancellationToken).ConfigureAwait(false);
}
break;
}

await MoveFile(payload.PayloadId, file.Id, file.File, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}).ConfigureAwait(false);

if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
}

private async Task MoveFile(Guid payloadId, string identity, StorageObjectMetadata file, CancellationToken cancellationToken)
{
Guard.Against.NullOrWhiteSpace(identity);
Guard.Against.Null(file);

if (file.IsMoveCompleted)
{
_logger.AlreadyMoved(payloadId, file.UploadPath);
return;
}

_logger.MovingFileToPayloadDirectory(payloadId, file.UploadPath);

try
{
await _storageService.CopyObjectAsync(
file.TemporaryBucketName,
file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath),
_options.Value.Storage.StorageServiceBucketName,
file.GetPayloadPath(payloadId),
cancellationToken).ConfigureAwait(false);

await VerifyFileExists(payloadId, file, cancellationToken).ConfigureAwait(false);
}
catch (StorageObjectNotFoundException ex) when (ex.Message.Contains("Not found", StringComparison.OrdinalIgnoreCase)) // TODO: StorageLib shall not throw any errors from MINIO
{
// when file cannot be found on the Storage Service, we assume file has been moved previously by verifying the file exists on destination.
_logger.FileMissingInPayload(payloadId, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), ex);
await VerifyFileExists(payloadId, file, cancellationToken).ConfigureAwait(false);
}
catch (StorageConnectionException ex)
{
_logger.StorageServiceConnectionError(ex);
throw new PayloadNotifyException(PayloadNotifyException.FailureReason.ServiceUnavailable);
}
catch (Exception ex)
{
_logger.PayloadMoveException(ex);
await LogFilesInMinIo(file.TemporaryBucketName, cancellationToken).ConfigureAwait(false);
throw new FileMoveException(file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), file.UploadPath, ex);
}

try
{
_logger.DeletingFileFromTemporaryBbucket(file.TemporaryBucketName, identity, file.TemporaryPath);
await _storageService.RemoveObjectAsync(file.TemporaryBucketName, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
_logger.ErrorDeletingFileAfterMoveComplete(file.TemporaryBucketName, identity, file.TemporaryPath);
}
finally
{
file.SetMoved(_options.Value.Storage.StorageServiceBucketName);
}
}

private async Task VerifyFileExists(Guid payloadId, StorageObjectMetadata file, CancellationToken cancellationToken)
{
await Policy
.Handle<VerifyObjectsException>()
.WaitAndRetryAsync(
_options.Value.Storage.Retries.RetryDelays,
(exception, timeSpan, retryCount, context) =>
{
_logger.ErrorUploadingFileToTemporaryStore(timeSpan, retryCount, exception);
})
.ExecuteAsync(async () =>
{
var internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
internalCancellationTokenSource.CancelAfter(_options.Value.Storage.StorageServiceListTimeout);
var exists = await _storageService.VerifyObjectExistsAsync(_options.Value.Storage.StorageServiceBucketName, file.GetPayloadPath(payloadId), cancellationToken).ConfigureAwait(false);
if (!exists)
{
_logger.FileMovedVerificationFailure(payloadId, file.UploadPath);
throw new PayloadNotifyException(PayloadNotifyException.FailureReason.MoveFailure, false);
}
})
.ConfigureAwait(false);
}

private async Task LogFilesInMinIo(string bucketName, CancellationToken cancellationToken)
{
try
{
var internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
internalCancellationTokenSource.CancelAfter(_options.Value.Storage.StorageServiceListTimeout);
var listingResults = await _storageService.ListObjectsAsync(bucketName, recursive: true, cancellationToken: internalCancellationTokenSource.Token).ConfigureAwait(false);
_logger.FilesFounddOnStorageService(bucketName, listingResults.Count);
var files = new List<string>();
foreach (var item in listingResults)
{
files.Add(item.FilePath);
}
_logger.FileFounddOnStorageService(bucketName, string.Join(Environment.NewLine, files));
}
catch (Exception ex)
{
_logger.ErrorListingFilesOnStorageService(ex);
}
}

private async Task<PayloadAction> UpdatePayloadState(Payload payload, Exception ex, CancellationToken cancellationToken = default)
{
Guard.Against.Null(payload);
Expand Down
6 changes: 4 additions & 2 deletions src/InformaticsGateway/Services/DicomWeb/IStreamsWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,14 @@ private async Task SaveInstance(Stream stream, string studyInstanceUid, string w
{
dicomInfo.SetWorkflows(workflowName);
}
// for DICOMweb, use correlation ID as the grouping key
var payloadId = await _payloadAssembler.Queue(correlationId, dicomInfo, _configuration.Value.DicomWeb.Timeout).ConfigureAwait(false);
dicomInfo.PayloadId = payloadId.ToString();

await dicomInfo.SetDataStreams(dicomFile, dicomFile.ToJson(_configuration.Value.Dicom.WriteDicomJson, _configuration.Value.Dicom.ValidateDicomOnSerialization), _configuration.Value.Storage.TemporaryDataStorage, _fileSystem, _configuration.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false);
_uploadQueue.Queue(dicomInfo);

// for DICOMweb, use correlation ID as the grouping key
await _payloadAssembler.Queue(correlationId, dicomInfo, _configuration.Value.DicomWeb.Timeout).ConfigureAwait(false);

_logger.QueuedStowInstance();

AddSuccess(null, uids);
Expand Down
3 changes: 2 additions & 1 deletion src/InformaticsGateway/Services/Fhir/FhirService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public async Task<FhirStoreResult> StoreAsync(HttpRequest request, string correl
throw new FhirStoreException(correlationId, $"Provided resource is of type '{content.InternalResourceType}' but request targeted type '{resourceType}'.", IssueType.Invalid);
}

var payloadId = await _payloadAssembler.Queue(correlationId, content.Metadata, Resources.PayloadAssemblerTimeout).ConfigureAwait(false);
content.Metadata.PayloadId = payloadId.ToString();
_uploadQueue.Queue(content.Metadata);
await _payloadAssembler.Queue(correlationId, content.Metadata, Resources.PayloadAssemblerTimeout).ConfigureAwait(false);
_logger.QueuedStowInstance();

content.StatusCode = StatusCodes.Status201Created;
Expand Down
3 changes: 2 additions & 1 deletion src/InformaticsGateway/Services/HealthLevel7/MllpService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ private async Task OnDisconnect(IMllpClient client, MllpClientResult result)
{
var hl7Fileetadata = new Hl7FileStorageMetadata(client.ClientId.ToString());
await hl7Fileetadata.SetDataStream(message.HL7Message, _configuration.Value.Storage.TemporaryDataStorage, _fileSystem, _configuration.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false);
var payloadId = await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Fileetadata).ConfigureAwait(false);
hl7Fileetadata.PayloadId = payloadId.ToString();
_uploadQueue.Queue(hl7Fileetadata);
await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Fileetadata).ConfigureAwait(false);
}
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ public async Task HandleInstanceAsync(DicomCStoreRequest request, string calledA
}

await dicomInfo.SetDataStreams(request.File, request.File.ToJson(_dicomJsonOptions, _validateDicomValueOnJsonSerialization), _options.Value.Storage.TemporaryDataStorage, _fileSystem, _options.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false);
_uploadQueue.Queue(dicomInfo);

var dicomTag = FellowOakDicom.DicomTag.Parse(_configuration.Grouping);
_logger.QueueInstanceUsingDicomTag(dicomTag);
var key = request.Dataset.GetSingleValue<string>(dicomTag);
await _payloadAssembler.Queue(key, dicomInfo, _configuration.Timeout).ConfigureAwait(false);

var payloadid = await _payloadAssembler.Queue(key, dicomInfo, _configuration.Timeout).ConfigureAwait(false);
dicomInfo.PayloadId = payloadid.ToString();
_uploadQueue.Queue(dicomInfo);
}

private bool AcceptsSopClass(string sopClassUid)
Expand Down
Loading