Skip to content

Commit ab5ff67

Browse files
committed
gh-102 Queue to upload received HL7 messages & notify payload assembler
Signed-off-by: Victor Chang <[email protected]>
1 parent 1ef6964 commit ab5ff67

17 files changed

+414
-211
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2021-2022 MONAI Consortium
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Text.Json.Serialization;
19+
using Ardalis.GuardClauses;
20+
21+
namespace Monai.Deploy.InformaticsGateway.Api.Storage
22+
{
23+
/// <summary>
24+
/// Provides basic information for a FHIR resource and storage hierarchy/path.
25+
/// </summary>
26+
public sealed record Hl7FileStorageMetadata : FileStorageMetadata
27+
{
28+
public const string Hl7SubDirectoryName = "ehr";
29+
public const string FileExtension = ".txt";
30+
31+
/// <inheritdoc/>
32+
[JsonIgnore]
33+
public override string DataTypeDirectoryName => Hl7SubDirectoryName;
34+
35+
/// <inheritdoc/>
36+
[JsonPropertyName("file")]
37+
public override StorageObjectMetadata File { get; set; }
38+
39+
/// <summary>
40+
/// DO NOT USE
41+
/// This constructor is intended for JSON serializer.
42+
/// Due to limitation in current version of .NET, the constructor must be public.
43+
/// https://github.com/dotnet/runtime/issues/31511
44+
/// </summary>
45+
[JsonConstructor]
46+
public Hl7FileStorageMetadata() { }
47+
48+
public Hl7FileStorageMetadata(string connectionId)
49+
: base(connectionId, Guid.NewGuid().ToString())
50+
{
51+
Guard.Against.NullOrWhiteSpace(connectionId, nameof(connectionId));
52+
53+
Source = connectionId;
54+
55+
File = new StorageObjectMetadata(FileExtension)
56+
{
57+
TemporaryPath = string.Join(PathSeparator, connectionId, DataTypeDirectoryName, $"{base.Id}{FileExtension}"),
58+
UploadPath = string.Join(PathSeparator, DataTypeDirectoryName, $"{base.Id}{FileExtension}"),
59+
ContentType = System.Net.Mime.MediaTypeNames.Text.Plain,
60+
};
61+
}
62+
}
63+
}

src/Api/Test/Storage/StorageObjectMetadataTest.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ public void GivenAStorageObjectMetadata_InitializeWithFileExtensionMissingDot_Ex
3434
[Fact]
3535
public void GivenAStorageObjectMetadata_WhenSetUploadIsCalled_ExpectUplaodValuesToBeSetAndDataStreamDisposed()
3636
{
37-
var metadata = new StorageObjectMetadata(".txt");
38-
metadata.Data = new MemoryStream();
37+
var metadata = new StorageObjectMetadata(".txt")
38+
{
39+
Data = new MemoryStream()
40+
};
3941
metadata.SetUploaded("MYBUCKET");
4042

4143
Assert.Equal("MYBUCKET", metadata.TemporaryBucketName);

src/Database/StorageMetadataWrapperEntityConfiguration.cs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,15 @@
1515
* limitations under the License.
1616
*/
1717

18-
using System;
19-
using System.Collections.Generic;
20-
using System.Linq;
21-
using System.Text.Json;
22-
using System.Text.Json.Serialization;
2318
using Microsoft.EntityFrameworkCore;
24-
using Microsoft.EntityFrameworkCore.ChangeTracking;
2519
using Microsoft.EntityFrameworkCore.Metadata.Builders;
26-
using Monai.Deploy.InformaticsGateway.Api.Storage;
2720

2821
namespace Monai.Deploy.InformaticsGateway.Database
2922
{
3023
internal class StorageMetadataWrapperEntityConfiguration : IEntityTypeConfiguration<StorageMetadataWrapper>
3124
{
3225
public void Configure(EntityTypeBuilder<StorageMetadataWrapper> builder)
3326
{
34-
var filesComparer = new ValueComparer<IList<FileStorageMetadata>>(
35-
(c1, c2) => c1.SequenceEqual(c2),
36-
c => c.Aggregate(0, (a, v) => HashCode.Combine(a, v.GetHashCode())),
37-
c => c.ToList());
38-
39-
var jsonSerializerSettings = new JsonSerializerOptions
40-
{
41-
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
42-
};
43-
4427
builder.HasKey(j => new
4528
{
4629
j.CorrelationId,

src/InformaticsGateway/Common/FileStorageMetadataExtensions.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,21 @@ public static async Task SetDataStreams(this DicomFileStorageMetadata dicomFileS
3434
await dicomFile.SaveAsync(dicomFileStorageMetadata.File.Data).ConfigureAwait(false);
3535
dicomFileStorageMetadata.File.Data.Seek(0, SeekOrigin.Begin);
3636

37-
dicomFileStorageMetadata.JsonFile.Data = new MemoryStream(Encoding.UTF8.GetBytes(dicomJson));
38-
dicomFileStorageMetadata.JsonFile.Data.Seek(0, SeekOrigin.Begin);
37+
SetTextStream(dicomFileStorageMetadata.JsonFile, dicomJson);
3938
}
4039

41-
public static async Task SetDataStream(this FhirFileStorageMetadata fhirFileStorageMetadata, string json)
40+
public static void SetDataStream(this FhirFileStorageMetadata fhirFileStorageMetadata, string json)
41+
=> SetTextStream(fhirFileStorageMetadata.File, json);
42+
43+
public static void SetDataStream(this Hl7FileStorageMetadata hl7FileStorageMetadata, string message)
44+
=> SetTextStream(hl7FileStorageMetadata.File, message);
45+
46+
private static void SetTextStream(StorageObjectMetadata storageObjectMetadata, string message)
4247
{
43-
Guard.Against.Null(json, nameof(json)); // allow empty here
48+
Guard.Against.Null(message, nameof(message)); // allow empty here
4449

45-
fhirFileStorageMetadata.File.Data = new MemoryStream();
46-
var sw = new StreamWriter(fhirFileStorageMetadata.File.Data, Encoding.UTF8);
47-
await sw.WriteAsync(json).ConfigureAwait(false);
48-
await sw.FlushAsync().ConfigureAwait(false);
49-
fhirFileStorageMetadata.File.Data.Seek(0, SeekOrigin.Begin);
50+
storageObjectMetadata.Data = new MemoryStream(Encoding.UTF8.GetBytes(message));
51+
storageObjectMetadata.Data.Seek(0, SeekOrigin.Begin);
5052
}
5153
}
5254
}

src/InformaticsGateway/Logging/Log.800.Hl7Service.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,23 @@ public static partial class Log
3131

3232
[LoggerMessage(EventId = 807, Level = LogLevel.Critical, Message = "Socket error: {error}")]
3333
public static partial void Hl7SocketException(this ILogger logger, string error);
34+
35+
[LoggerMessage(EventId = 808, Level = LogLevel.Critical, Message = "Error handling HL7 results.")]
36+
public static partial void ErrorHandlingHl7Results(this ILogger logger, Exception ex);
37+
38+
[LoggerMessage(EventId = 809, Level = LogLevel.Debug, Message = "Acknowledgment type={value}.")]
39+
public static partial void AcknowledgmentType(this ILogger logger, string value);
40+
41+
[LoggerMessage(EventId = 810, Level = LogLevel.Information, Message = "Acknowledgment sent: length={length}.")]
42+
public static partial void AcknowledgmentSent(this ILogger logger, int length);
43+
44+
[LoggerMessage(EventId = 811, Level = LogLevel.Debug, Message = "HL7 bytes received: {length}.")]
45+
public static partial void Hl7MessageBytesRead(this ILogger logger, int length);
46+
47+
[LoggerMessage(EventId = 812, Level = LogLevel.Debug, Message = "Parsing message with {length} bytes.")]
48+
public static partial void Hl7GenerateMessage(this ILogger logger, int length);
49+
50+
[LoggerMessage(EventId = 813, Level = LogLevel.Debug, Message = "Waiting for HL7 message.")]
51+
public static partial void HL7ReadingMessage(this ILogger logger);
3452
}
3553
}

src/InformaticsGateway/Services/Connectors/DataRetrievalService.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public DataRetrievalService(
6969
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
7070
_options = options ?? throw new ArgumentNullException(nameof(options));
7171
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
72-
_options = options ?? throw new ArgumentNullException(nameof(options));
7372

7473
_rootScope = _serviceScopeFactory.CreateScope();
7574

@@ -205,6 +204,10 @@ private void RestoreExistingInstances(InferenceRequest inferenceRequest, Diction
205204

206205
foreach (var file in files)
207206
{
207+
if (cancellationToken.IsCancellationRequested)
208+
{
209+
break;
210+
}
208211
if (file is DicomFileStorageMetadata dicomFileInfo)
209212
{
210213
retrievedInstances.Add(dicomFileInfo.Id, dicomFileInfo);
@@ -327,7 +330,7 @@ private async Task<bool> RetrieveFhirResource(string transactionId, HttpClient h
327330
}
328331

329332
var fhirFile = new FhirFileStorageMetadata(transactionId, resource.Type, resource.Id, fhirFormat);
330-
await fhirFile.SetDataStream(json).ConfigureAwait(false);
333+
fhirFile.SetDataStream(json);
331334
retrievedResources.Add(fhirFile.Id, fhirFile);
332335
return true;
333336
}

src/InformaticsGateway/Services/Connectors/PayloadMoveException.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,10 @@ public PayloadNotifyException(FailureReason reason)
3434
{
3535
Reason = reason;
3636
}
37+
38+
protected PayloadNotifyException(System.Runtime.Serialization.SerializationInfo serializationInfo, System.Runtime.Serialization.StreamingContext streamingContext)
39+
{
40+
throw new NotImplementedException();
41+
}
3742
}
3843
}

src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,15 @@ internal interface IPayloadNotificationActionHandler
3939
Task NotifyAsync(Payload payload, ActionBlock<Payload> notificationQueue, CancellationToken cancellationToken = default);
4040
}
4141

42-
internal class PayloadNotificationActionHandler : IPayloadNotificationActionHandler
42+
internal class PayloadNotificationActionHandler : IPayloadNotificationActionHandler, IDisposable
4343
{
4444
private readonly IServiceScopeFactory _serviceScopeFactory;
4545
private readonly ILogger<PayloadNotificationActionHandler> _logger;
4646
private readonly IOptions<InformaticsGatewayConfiguration> _options;
4747

4848
private readonly IServiceScope _scope;
4949
private readonly IMessageBrokerPublisherService _messageBrokerPublisherService;
50+
private bool _disposedValue;
5051

5152
public PayloadNotificationActionHandler(IServiceScopeFactory serviceScopeFactory,
5253
ILogger<PayloadNotificationActionHandler> logger,
@@ -168,5 +169,25 @@ private async Task<PayloadAction> UpdatePayloadState(Payload payload)
168169
return PayloadAction.Updated;
169170
}
170171
}
172+
173+
protected virtual void Dispose(bool disposing)
174+
{
175+
if (!_disposedValue)
176+
{
177+
if (disposing)
178+
{
179+
_scope.Dispose();
180+
}
181+
182+
_disposedValue = true;
183+
}
184+
}
185+
186+
public void Dispose()
187+
{
188+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
189+
Dispose(disposing: true);
190+
GC.SuppressFinalize(this);
191+
}
171192
}
172193
}

src/InformaticsGateway/Services/HealthLevel7/IMllpClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ internal interface IMllpClient
1313

1414
void Dispose();
1515

16-
Task Start(Action<IMllpClient, MllpClientResult> onDisconnect, CancellationToken cancellationToken);
16+
Task Start(Func<IMllpClient, MllpClientResult, Task> onDisconnect, CancellationToken cancellationToken);
1717
}
1818
}

src/InformaticsGateway/Services/HealthLevel7/MllpClient.cs

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ internal sealed class MllpClient : IDisposable, IMllpClient
2424
private readonly ILogger<MllpClient> _logger;
2525
private readonly List<Exception> _exceptions;
2626
private readonly List<Message> _messages;
27+
private readonly IDisposable _loggerScope;
2728
private bool _disposedValue;
2829

2930
public Guid ClientId { get; }
@@ -38,10 +39,10 @@ public MllpClient(ITcpClientAdapter client, Hl7Configuration configurations, ILo
3839
_exceptions = new List<Exception>();
3940
_messages = new List<Message>();
4041

41-
_logger.BeginScope(new LoggingDataDictionary<string, object> { { "End point", _client.RemoteEndPoint }, { "CorrelationId", ClientId } });
42+
_loggerScope = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "End point", _client.RemoteEndPoint }, { "CorrelationId", ClientId } });
4243
}
4344

44-
public async Task Start(Action<IMllpClient, MllpClientResult> onDisconnect, CancellationToken cancellationToken)
45+
public async Task Start(Func<IMllpClient, MllpClientResult, Task> onDisconnect, CancellationToken cancellationToken)
4546
{
4647
using var clientStream = _client.GetStream();
4748
clientStream.ReadTimeout = _configurations.ClientTimeoutMilliseconds;
@@ -52,7 +53,7 @@ public async Task Start(Action<IMllpClient, MllpClientResult> onDisconnect, Canc
5253

5354
if (onDisconnect is not null)
5455
{
55-
onDisconnect(this, new MllpClientResult(_messages, _exceptions.Count > 0 ? new AggregateException(_exceptions) : null));
56+
await onDisconnect(this, new MllpClientResult(_messages, _exceptions.Count > 0 ? new AggregateException(_exceptions) : null));
5657
}
5758
}
5859

@@ -64,12 +65,20 @@ private async Task<IList<Message>> ReceiveData(INetworkStream clientStream, Canc
6465
int bytesRead;
6566
var data = string.Empty;
6667
var messages = new List<Message>();
68+
var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
6769

6870
while (true)
6971
{
7072
try
7173
{
72-
bytesRead = await clientStream.ReadAsync(messageBuffer, cancellationToken).ConfigureAwait(false);
74+
_logger.HL7ReadingMessage();
75+
linkedCancellationTokenSource.CancelAfter(_configurations.ClientTimeoutMilliseconds);
76+
bytesRead = await clientStream.ReadAsync(messageBuffer, linkedCancellationTokenSource.Token).ConfigureAwait(false);
77+
_logger.Hl7MessageBytesRead(bytesRead);
78+
if (!linkedCancellationTokenSource.TryReset())
79+
{
80+
linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
81+
}
7382
}
7483
catch (Exception ex)
7584
{
@@ -85,24 +94,32 @@ private async Task<IList<Message>> ReceiveData(INetworkStream clientStream, Canc
8594

8695
data += Encoding.UTF8.GetString(messageBuffer.ToArray());
8796

88-
var startIndex = data.IndexOf(Resources.AsciiVT);
89-
if (startIndex >= 0)
97+
do
9098
{
91-
var endIndex = data.IndexOf(Resources.AsciiFS);
92-
93-
if (endIndex > startIndex)
99+
var startIndex = data.IndexOf(Resources.AsciiVT);
100+
if (startIndex >= 0)
94101
{
95-
if (!CreateMessage(startIndex, endIndex, ref data, out var message))
96-
{
97-
break;
98-
}
99-
else
102+
var endIndex = data.IndexOf(Resources.AsciiFS);
103+
104+
if (endIndex > startIndex)
100105
{
101-
await SendAcknowledgment(clientStream, message, cancellationToken).ConfigureAwait(false);
102-
messages.Add(message);
106+
if (!CreateMessage(startIndex, endIndex, ref data, out var message))
107+
{
108+
break;
109+
}
110+
else
111+
{
112+
await SendAcknowledgment(clientStream, message, cancellationToken).ConfigureAwait(false);
113+
messages.Add(message);
114+
}
115+
103116
}
104117
}
105-
}
118+
else
119+
{
120+
break;
121+
}
122+
} while (true);
106123
}
107124
return messages;
108125
}
@@ -124,7 +141,8 @@ private async Task SendAcknowledgment(INetworkStream clientStream, Message messa
124141
try
125142
{
126143
await clientStream.WriteAsync(ackData, cancellationToken).ConfigureAwait(false);
127-
await clientStream.FlushAsync(cancellationToken);
144+
await clientStream.FlushAsync(cancellationToken).ConfigureAwait(false);
145+
_logger.AcknowledgmentSent(ackData.Length);
128146
}
129147
catch (Exception ex)
130148
{
@@ -144,6 +162,9 @@ private bool ShouldSendAcknowledgment(Message message)
144162
{
145163
return true;
146164
}
165+
166+
_logger.AcknowledgmentType(value.Value);
167+
147168
return value.Value switch
148169
{
149170
Resources.AcknowledgmentTypeNever => false,
@@ -166,7 +187,9 @@ private bool CreateMessage(int startIndex, int endIndex, ref string data, out Me
166187
var messageEndIndex = endIndex + 1;
167188
try
168189
{
169-
message = new Message(data.Substring(messageStartIndex, endIndex - messageStartIndex));
190+
var text = data.Substring(messageStartIndex, endIndex - messageStartIndex);
191+
_logger.Hl7GenerateMessage(text.Length);
192+
message = new Message(text);
170193
message.ParseMessage();
171194
data = data.Length > endIndex ? data.Substring(messageEndIndex) : string.Empty;
172195
return true;
@@ -187,6 +210,7 @@ private void Dispose(bool disposing)
187210
if (disposing)
188211
{
189212
_client.Dispose();
213+
_loggerScope.Dispose();
190214
}
191215

192216
_disposedValue = true;

0 commit comments

Comments
 (0)