Skip to content

Commit 7c08986

Browse files
committed
gh-102 Queue to upload received HL7 messages & notify payload assembler
Signed-off-by: Victor Chang <[email protected]>
1 parent daffc1c commit 7c08986

17 files changed

+308
-184
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2021-2022 MONAI Consortium
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Text.Json.Serialization;
19+
using Ardalis.GuardClauses;
20+
21+
namespace Monai.Deploy.InformaticsGateway.Api.Storage
22+
{
23+
/// <summary>
24+
/// Provides basic information for a FHIR resource and storage hierarchy/path.
25+
/// </summary>
26+
public sealed record Hl7FileStorageMetadata : FileStorageMetadata
27+
{
28+
public const string Hl7SubDirectoryName = "ehr";
29+
public const string FileExtension = ".txt";
30+
31+
/// <inheritdoc/>
32+
[JsonIgnore]
33+
public override string DataTypeDirectoryName => Hl7SubDirectoryName;
34+
35+
/// <inheritdoc/>
36+
[JsonPropertyName("file")]
37+
public override StorageObjectMetadata File { get; set; }
38+
39+
/// <summary>
40+
/// DO NOT USE
41+
/// This constructor is intended for JSON serializer.
42+
/// Due to limitation in current version of .NET, the constructor must be public.
43+
/// https://github.com/dotnet/runtime/issues/31511
44+
/// </summary>
45+
[JsonConstructor]
46+
public Hl7FileStorageMetadata() { }
47+
48+
public Hl7FileStorageMetadata(string connectionId)
49+
: base(connectionId, Guid.NewGuid().ToString())
50+
{
51+
Guard.Against.NullOrWhiteSpace(connectionId, nameof(connectionId));
52+
53+
Source = connectionId;
54+
55+
File = new StorageObjectMetadata(FileExtension)
56+
{
57+
TemporaryPath = string.Join(PathSeparator, connectionId, DataTypeDirectoryName, $"{base.Id}{FileExtension}"),
58+
UploadPath = string.Join(PathSeparator, DataTypeDirectoryName, $"{base.Id}{FileExtension}"),
59+
ContentType = System.Net.Mime.MediaTypeNames.Text.Plain,
60+
};
61+
}
62+
}
63+
}

src/Api/Test/Storage/StorageObjectMetadataTest.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ public void GivenAStorageObjectMetadata_InitializeWithFileExtensionMissingDot_Ex
3434
[Fact]
3535
public void GivenAStorageObjectMetadata_WhenSetUploadIsCalled_ExpectUplaodValuesToBeSetAndDataStreamDisposed()
3636
{
37-
var metadata = new StorageObjectMetadata(".txt");
38-
metadata.Data = new MemoryStream();
37+
var metadata = new StorageObjectMetadata(".txt")
38+
{
39+
Data = new MemoryStream()
40+
};
3941
metadata.SetUploaded("MYBUCKET");
4042

4143
Assert.Equal("MYBUCKET", metadata.TemporaryBucketName);

src/Database/StorageMetadataWrapperEntityConfiguration.cs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,15 @@
1515
* limitations under the License.
1616
*/
1717

18-
using System;
19-
using System.Collections.Generic;
20-
using System.Linq;
21-
using System.Text.Json;
22-
using System.Text.Json.Serialization;
2318
using Microsoft.EntityFrameworkCore;
24-
using Microsoft.EntityFrameworkCore.ChangeTracking;
2519
using Microsoft.EntityFrameworkCore.Metadata.Builders;
26-
using Monai.Deploy.InformaticsGateway.Api.Storage;
2720

2821
namespace Monai.Deploy.InformaticsGateway.Database
2922
{
3023
internal class StorageMetadataWrapperEntityConfiguration : IEntityTypeConfiguration<StorageMetadataWrapper>
3124
{
3225
public void Configure(EntityTypeBuilder<StorageMetadataWrapper> builder)
3326
{
34-
var filesComparer = new ValueComparer<IList<FileStorageMetadata>>(
35-
(c1, c2) => c1.SequenceEqual(c2),
36-
c => c.Aggregate(0, (a, v) => HashCode.Combine(a, v.GetHashCode())),
37-
c => c.ToList());
38-
39-
var jsonSerializerSettings = new JsonSerializerOptions
40-
{
41-
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
42-
};
43-
4427
builder.HasKey(j => new
4528
{
4629
j.CorrelationId,

src/InformaticsGateway/Common/FileStorageMetadataExtensions.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,21 @@ public static async Task SetDataStreams(this DicomFileStorageMetadata dicomFileS
3434
await dicomFile.SaveAsync(dicomFileStorageMetadata.File.Data).ConfigureAwait(false);
3535
dicomFileStorageMetadata.File.Data.Seek(0, SeekOrigin.Begin);
3636

37-
dicomFileStorageMetadata.JsonFile.Data = new MemoryStream(Encoding.UTF8.GetBytes(dicomJson));
38-
dicomFileStorageMetadata.JsonFile.Data.Seek(0, SeekOrigin.Begin);
37+
SetTextStream(dicomFileStorageMetadata.JsonFile, dicomJson);
3938
}
4039

41-
public static async Task SetDataStream(this FhirFileStorageMetadata fhirFileStorageMetadata, string json)
40+
public static void SetDataStream(this FhirFileStorageMetadata fhirFileStorageMetadata, string json)
41+
=> SetTextStream(fhirFileStorageMetadata.File, json);
42+
43+
public static void SetDataStream(this Hl7FileStorageMetadata hl7FileStorageMetadata, string message)
44+
=> SetTextStream(hl7FileStorageMetadata.File, message);
45+
46+
private static void SetTextStream(StorageObjectMetadata storageObjectMetadata, string message)
4247
{
43-
Guard.Against.Null(json, nameof(json)); // allow empty here
48+
Guard.Against.Null(message, nameof(message)); // allow empty here
4449

45-
fhirFileStorageMetadata.File.Data = new MemoryStream();
46-
var sw = new StreamWriter(fhirFileStorageMetadata.File.Data, Encoding.UTF8);
47-
await sw.WriteAsync(json).ConfigureAwait(false);
48-
await sw.FlushAsync().ConfigureAwait(false);
49-
fhirFileStorageMetadata.File.Data.Seek(0, SeekOrigin.Begin);
50+
storageObjectMetadata.Data = new MemoryStream(Encoding.UTF8.GetBytes(message));
51+
storageObjectMetadata.Data.Seek(0, SeekOrigin.Begin);
5052
}
5153
}
5254
}

src/InformaticsGateway/Logging/Log.800.Hl7Service.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,8 @@ public static partial class Log
3131

3232
[LoggerMessage(EventId = 807, Level = LogLevel.Critical, Message = "Socket error: {error}")]
3333
public static partial void Hl7SocketException(this ILogger logger, string error);
34+
35+
[LoggerMessage(EventId = 807, Level = LogLevel.Critical, Message = "Error handling HL7 results.")]
36+
public static partial void ErrorHandlingHl7Results(this ILogger logger, Exception ex);
3437
}
3538
}

src/InformaticsGateway/Services/Connectors/DataRetrievalService.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public DataRetrievalService(
6969
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
7070
_options = options ?? throw new ArgumentNullException(nameof(options));
7171
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
72-
_options = options ?? throw new ArgumentNullException(nameof(options));
7372

7473
_rootScope = _serviceScopeFactory.CreateScope();
7574

@@ -205,6 +204,10 @@ private void RestoreExistingInstances(InferenceRequest inferenceRequest, Diction
205204

206205
foreach (var file in files)
207206
{
207+
if (cancellationToken.IsCancellationRequested)
208+
{
209+
break;
210+
}
208211
if (file is DicomFileStorageMetadata dicomFileInfo)
209212
{
210213
retrievedInstances.Add(dicomFileInfo.Id, dicomFileInfo);
@@ -327,7 +330,7 @@ private async Task<bool> RetrieveFhirResource(string transactionId, HttpClient h
327330
}
328331

329332
var fhirFile = new FhirFileStorageMetadata(transactionId, resource.Type, resource.Id, fhirFormat);
330-
await fhirFile.SetDataStream(json).ConfigureAwait(false);
333+
fhirFile.SetDataStream(json);
331334
retrievedResources.Add(fhirFile.Id, fhirFile);
332335
return true;
333336
}

src/InformaticsGateway/Services/Connectors/PayloadMoveException.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,10 @@ public PayloadNotifyException(FailureReason reason)
3434
{
3535
Reason = reason;
3636
}
37+
38+
protected PayloadNotifyException(System.Runtime.Serialization.SerializationInfo serializationInfo, System.Runtime.Serialization.StreamingContext streamingContext)
39+
{
40+
throw new NotImplementedException();
41+
}
3742
}
3843
}

src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,15 @@ internal interface IPayloadNotificationActionHandler
3939
Task NotifyAsync(Payload payload, ActionBlock<Payload> notificationQueue, CancellationToken cancellationToken = default);
4040
}
4141

42-
internal class PayloadNotificationActionHandler : IPayloadNotificationActionHandler
42+
internal class PayloadNotificationActionHandler : IPayloadNotificationActionHandler, IDisposable
4343
{
4444
private readonly IServiceScopeFactory _serviceScopeFactory;
4545
private readonly ILogger<PayloadNotificationActionHandler> _logger;
4646
private readonly IOptions<InformaticsGatewayConfiguration> _options;
4747

4848
private readonly IServiceScope _scope;
4949
private readonly IMessageBrokerPublisherService _messageBrokerPublisherService;
50+
private bool _disposedValue;
5051

5152
public PayloadNotificationActionHandler(IServiceScopeFactory serviceScopeFactory,
5253
ILogger<PayloadNotificationActionHandler> logger,
@@ -168,5 +169,25 @@ private async Task<PayloadAction> UpdatePayloadState(Payload payload)
168169
return PayloadAction.Updated;
169170
}
170171
}
172+
173+
protected virtual void Dispose(bool disposing)
174+
{
175+
if (!_disposedValue)
176+
{
177+
if (disposing)
178+
{
179+
_scope.Dispose();
180+
}
181+
182+
_disposedValue = true;
183+
}
184+
}
185+
186+
public void Dispose()
187+
{
188+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
189+
Dispose(disposing: true);
190+
GC.SuppressFinalize(this);
191+
}
171192
}
172193
}

src/InformaticsGateway/Services/HealthLevel7/IMllpClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ internal interface IMllpClient
1313

1414
void Dispose();
1515

16-
Task Start(Action<IMllpClient, MllpClientResult> onDisconnect, CancellationToken cancellationToken);
16+
Task Start(Func<IMllpClient, MllpClientResult, Task> onDisconnect, CancellationToken cancellationToken);
1717
}
1818
}

src/InformaticsGateway/Services/HealthLevel7/MllpClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public MllpClient(ITcpClientAdapter client, Hl7Configuration configurations, ILo
4141
_logger.BeginScope(new LoggingDataDictionary<string, object> { { "End point", _client.RemoteEndPoint }, { "CorrelationId", ClientId } });
4242
}
4343

44-
public async Task Start(Action<IMllpClient, MllpClientResult> onDisconnect, CancellationToken cancellationToken)
44+
public async Task Start(Func<IMllpClient, MllpClientResult, Task> onDisconnect, CancellationToken cancellationToken)
4545
{
4646
using var clientStream = _client.GetStream();
4747
clientStream.ReadTimeout = _configurations.ClientTimeoutMilliseconds;
@@ -52,7 +52,7 @@ public async Task Start(Action<IMllpClient, MllpClientResult> onDisconnect, Canc
5252

5353
if (onDisconnect is not null)
5454
{
55-
onDisconnect(this, new MllpClientResult(_messages, _exceptions.Count > 0 ? new AggregateException(_exceptions) : null));
55+
await onDisconnect(this, new MllpClientResult(_messages, _exceptions.Count > 0 ? new AggregateException(_exceptions) : null));
5656
}
5757
}
5858

src/InformaticsGateway/Services/HealthLevel7/MllpClientResult.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ internal class MllpClientResult
1414

1515
public MllpClientResult(IList<Message> messages, AggregateException aggregateException)
1616
{
17-
Messages = messages;
17+
Messages = messages ?? new List<Message>();
1818
AggregateException = aggregateException;
1919
}
2020
}

src/InformaticsGateway/Services/HealthLevel7/MllpService.cs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@
66
using System.Collections.Generic;
77
using System.Threading;
88
using System.Threading.Tasks;
9+
using Ardalis.GuardClauses;
910
using Microsoft.Extensions.DependencyInjection;
1011
using Microsoft.Extensions.Hosting;
1112
using Microsoft.Extensions.Logging;
1213
using Microsoft.Extensions.Options;
1314
using Monai.Deploy.InformaticsGateway.Api.Rest;
15+
using Monai.Deploy.InformaticsGateway.Api.Storage;
1416
using Monai.Deploy.InformaticsGateway.Common;
1517
using Monai.Deploy.InformaticsGateway.Configuration;
1618
using Monai.Deploy.InformaticsGateway.Logging;
1719
using Monai.Deploy.InformaticsGateway.Services.Common;
20+
using Monai.Deploy.InformaticsGateway.Services.Connectors;
21+
using Monai.Deploy.InformaticsGateway.Services.Storage;
1822

1923
namespace Monai.Deploy.InformaticsGateway.Services.HealthLevel7
2024
{
@@ -24,6 +28,8 @@ internal sealed class MllpService : IHostedService, IDisposable, IMonaiService
2428
private bool _disposedValue;
2529
private readonly ITcpListener _tcpListener;
2630
private readonly IMllpClientFactory _mllpClientFactory;
31+
private readonly IObjectUploadQueue _uploadQueue;
32+
private readonly IPayloadAssembler _payloadAssembler;
2733
private readonly IServiceScope _serviceScope;
2834
private readonly ILoggerFactory _logginFactory;
2935
private readonly ILogger<MllpService> _logger;
@@ -58,6 +64,8 @@ public MllpService(IServiceScopeFactory serviceScopeFactory,
5864
var tcpListenerFactory = _serviceScope.ServiceProvider.GetService<ITcpListenerFactory>() ?? throw new ServiceNotFoundException(nameof(ITcpListenerFactory));
5965
_tcpListener = tcpListenerFactory.CreateTcpListener(System.Net.IPAddress.Any, _configuration.Value.Hl7.Port);
6066
_mllpClientFactory = _serviceScope.ServiceProvider.GetService<IMllpClientFactory>() ?? throw new ServiceNotFoundException(nameof(IMllpClientFactory));
67+
_uploadQueue = _serviceScope.ServiceProvider.GetService<IObjectUploadQueue>() ?? throw new ServiceNotFoundException(nameof(IObjectUploadQueue));
68+
_payloadAssembler = _serviceScope.ServiceProvider.GetService<IPayloadAssembler>() ?? throw new ServiceNotFoundException(nameof(IPayloadAssembler));
6169
_activeTasks = new ConcurrentDictionary<Guid, IMllpClient>();
6270
}
6371

@@ -90,19 +98,27 @@ private async Task BackgroundProcessing(CancellationToken cancellationToken)
9098
{
9199
while (!cancellationToken.IsCancellationRequested)
92100
{
101+
IMllpClient mllpClient = null;
93102
try
94103
{
95104
WaitUntilAvailable(_configuration.Value.Hl7.MaximumNumberOfConnections);
96105
var client = await _tcpListener.AcceptTcpClientAsync(cancellationToken).ConfigureAwait(false);
97106
_logger.ClientConnected();
98107

99-
var mllpClient = _mllpClientFactory.CreateClient(client, _configuration.Value.Hl7, _logginFactory.CreateLogger<MllpClient>());
108+
mllpClient = _mllpClientFactory.CreateClient(client, _configuration.Value.Hl7, _logginFactory.CreateLogger<MllpClient>());
100109
_ = mllpClient.Start(OnDisconnect, cancellationToken);
101110
_activeTasks.TryAdd(mllpClient.ClientId, mllpClient);
102111
}
103112
catch (System.Net.Sockets.SocketException ex)
104113
{
105114
_logger.Hl7SocketException(ex.Message);
115+
116+
if (mllpClient is not null)
117+
{
118+
mllpClient.Dispose();
119+
_activeTasks.Remove(mllpClient.ClientId, out _);
120+
}
121+
106122
if (ex.ErrorCode == SOCKET_OPERATION_CANCELLED)
107123
{
108124
break;
@@ -117,9 +133,27 @@ private async Task BackgroundProcessing(CancellationToken cancellationToken)
117133
_logger.ServiceCancelled(ServiceName);
118134
}
119135

120-
private void OnDisconnect(IMllpClient client, MllpClientResult result)
136+
private async Task OnDisconnect(IMllpClient client, MllpClientResult result)
121137
{
138+
Guard.Against.Null(client, nameof(client));
139+
Guard.Against.Null(result, nameof(result));
140+
122141
_activeTasks.Remove(client.ClientId, out _);
142+
143+
try
144+
{
145+
foreach (var message in result.Messages)
146+
{
147+
var hl7Fileetadata = new Hl7FileStorageMetadata(client.ClientId.ToString());
148+
hl7Fileetadata.SetDataStream(message.HL7Message);
149+
_uploadQueue.Queue(hl7Fileetadata);
150+
await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Fileetadata).ConfigureAwait(false);
151+
}
152+
}
153+
catch (Exception ex)
154+
{
155+
_logger.ErrorHandlingHl7Results(ex);
156+
}
123157
}
124158

125159
private void WaitUntilAvailable(int maximumNumberOfConnections)

src/InformaticsGateway/Services/Http/InferenceController.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@
2222
using Microsoft.AspNetCore.Http;
2323
using Microsoft.AspNetCore.Mvc;
2424
using Microsoft.Extensions.Logging;
25-
using Microsoft.Extensions.Options;
2625
using Monai.Deploy.InformaticsGateway.Api;
2726
using Monai.Deploy.InformaticsGateway.Api.Rest;
28-
using Monai.Deploy.InformaticsGateway.Configuration;
2927
using Monai.Deploy.InformaticsGateway.Logging;
3028
using Monai.Deploy.InformaticsGateway.Repositories;
3129

@@ -36,16 +34,13 @@ namespace Monai.Deploy.InformaticsGateway.Services.Http
3634
public class InferenceController : ControllerBase
3735
{
3836
private readonly IInferenceRequestRepository _inferenceRequestRepository;
39-
private readonly IOptions<InformaticsGatewayConfiguration> _configuration;
4037
private readonly ILogger<InferenceController> _logger;
4138

4239
public InferenceController(
4340
IInferenceRequestRepository inferenceRequestRepository,
44-
IOptions<InformaticsGatewayConfiguration> configuration,
4541
ILogger<InferenceController> logger)
4642
{
4743
_inferenceRequestRepository = inferenceRequestRepository ?? throw new ArgumentNullException(nameof(inferenceRequestRepository));
48-
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
4944
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
5045
}
5146

0 commit comments

Comments
 (0)