Skip to content

Commit 1a1e21f

Browse files
committed
gh-8 initial implementation of HL7 MLLP Frame listener
Signed-off-by: Victor Chang <[email protected]>
1 parent 6058453 commit 1a1e21f

File tree

13 files changed

+458
-9
lines changed

13 files changed

+458
-9
lines changed

src/Configuration/ConfigurationValidator.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public ValidateOptionsResult Validate(string name, InformaticsGatewayConfigurati
4141
valid &= IsDicomWebValid(options.DicomWeb);
4242
valid &= IsFhirValid(options.Fhir);
4343
valid &= IsStorageValid(options.Storage);
44+
valid &= IsHl7Valid(options.Hl7);
4445

4546
#pragma warning disable CA2254 // Template should be a static expression
4647
_validationErrors.ForEach(p => _logger.Log(LogLevel.Error, p));
@@ -49,6 +50,14 @@ public ValidateOptionsResult Validate(string name, InformaticsGatewayConfigurati
4950
return valid ? ValidateOptionsResult.Success : ValidateOptionsResult.Fail(string.Join(Environment.NewLine, _validationErrors));
5051
}
5152

53+
private bool IsHl7Valid(Hl7Configuration hl7)
54+
{
55+
var valid = true;
56+
57+
valid &= ValidationExtensions.IsPortValid("InformaticsGateway>hl7>port", hl7.Port, _validationErrors);
58+
return valid;
59+
}
60+
5261
private bool IsStorageValid(StorageConfiguration storage)
5362
{
5463
var valid = true;

src/Configuration/Hl7Configuration.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// SPDX-FileCopyrightText: © 2022 MONAI Consortium
2+
// SPDX-License-Identifier: Apache License 2.0
3+
4+
using Microsoft.Extensions.Configuration;
5+
6+
namespace Monai.Deploy.InformaticsGateway.Configuration
7+
{
8+
public class Hl7Configuration
9+
{
10+
public static readonly int DefaultClientTimeout = 300000;
11+
public const int DefaultMaximumNumberOfConnections = 10;
12+
13+
/// <summary>
14+
/// Gets or sets the client connection timeout in milliseconds.
15+
/// </summary>
16+
[ConfigurationKeyName("clientTimeout")]
17+
public int ClientTimeoutMilliseconds { get; set; } = DefaultClientTimeout;
18+
19+
/// <summary>
20+
/// Gets or sets maximum number of concurrent connections for the HL7 service.
21+
/// Defaults to 10.
22+
/// </summary>
23+
[ConfigurationKeyName("maximumNumberOfConnections")]
24+
public int MaximumNumberOfConnections { get; set; } = DefaultMaximumNumberOfConnections;
25+
26+
/// <summary>
27+
/// Gets or sets the MLLP listening port.
28+
/// Defaults to 2575.
29+
/// </summary>
30+
[ConfigurationKeyName("clientTimeout")]
31+
public int Port { get; set; } = 2575;
32+
33+
/// <summary>
34+
/// Gets or sets wether to respond with an ack/nack message.
35+
/// Defaults to true.
36+
/// </summary>
37+
[ConfigurationKeyName("sendAck")]
38+
public bool SendAcknowledgment { get; set; } = true;
39+
40+
public uint BufferSize { get; set; } = 10240;
41+
42+
public Hl7Configuration()
43+
{
44+
}
45+
}
46+
}

src/Configuration/InformaticsGatewayConfiguration.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ public class InformaticsGatewayConfiguration
4343
[ConfigurationKeyName("fhir")]
4444
public FhirConfiguration Fhir { get; set; }
4545

46+
/// <summary>
47+
/// Represents the <c>hl7</c> section of the configuration file.
48+
/// </summary>
49+
/// <value></value>
50+
[ConfigurationKeyName("hl7")]
51+
public Hl7Configuration Hl7 { get; set; }
52+
4653
/// <summary>
4754
/// Represents the <c>export</c> section of the configuration file.
4855
/// </summary>
@@ -63,6 +70,7 @@ public InformaticsGatewayConfiguration()
6370
Fhir = new FhirConfiguration();
6471
Export = new DataExportConfiguration();
6572
Messaging = new MessageBrokerConfiguration();
73+
Hl7 = new Hl7Configuration();
6674
}
6775
}
6876
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// SPDX-FileCopyrightText: © 2022 MONAI Consortium
2+
// SPDX-License-Identifier: Apache License 2.0
3+
4+
using System;
5+
using Microsoft.Extensions.Logging;
6+
7+
namespace Monai.Deploy.InformaticsGateway.Logging
8+
{
9+
public static partial class Log
10+
{
11+
[LoggerMessage(EventId = 800, Level = LogLevel.Information, Message = "New HL7 client connected.")]
12+
public static partial void ClientConnected(this ILogger logger);
13+
14+
[LoggerMessage(EventId = 801, Level = LogLevel.Error, Message = "Error reading data, connection may be dropped.")]
15+
public static partial void ExceptionReadingClientStream(this ILogger logger, Exception ex);
16+
17+
[LoggerMessage(EventId = 802, Level = LogLevel.Error, Message = "Error parsing HL7 message.")]
18+
public static partial void ErrorParsingHl7Message(this ILogger logger, Exception ex);
19+
20+
[LoggerMessage(EventId = 803, Level = LogLevel.Warning, Message = "Unable to locate {segment} field {field} in the HL7 message.")]
21+
public static partial void MissingFieldInHL7Message(this ILogger logger, string segment, int field, Exception ex);
22+
23+
[LoggerMessage(EventId = 804, Level = LogLevel.Error, Message = "Error sending HL7 acknowledgment.")]
24+
public static partial void ErrorSendingHl7Acknowledgment(this ILogger logger, Exception ex);
25+
}
26+
}

src/InformaticsGateway/Monai.Deploy.InformaticsGateway.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<PackageReference Include="Ardalis.GuardClauses" Version="4.0.1" />
2424
<PackageReference Include="DotNext.Threading" Version="4.6.0" />
2525
<PackageReference Include="fo-dicom" Version="5.0.2" />
26+
<PackageReference Include="HL7-dotnetcore" Version="2.29.0" />
2627
<PackageReference Include="Karambolo.Extensions.Logging.File" Version="3.3.0" />
2728
<PackageReference Include="GitVersion.MsBuild" Version="5.10.3">
2829
<PrivateAssets>All</PrivateAssets>

src/InformaticsGateway/Program.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
using Monai.Deploy.InformaticsGateway.Services.Connectors;
2121
using Monai.Deploy.InformaticsGateway.Services.DicomWeb;
2222
using Monai.Deploy.InformaticsGateway.Services.Export;
23+
using Monai.Deploy.InformaticsGateway.Services.HealthLevel7;
2324
using Monai.Deploy.InformaticsGateway.Services.Http;
2425
using Monai.Deploy.InformaticsGateway.Services.Scp;
2526
using Monai.Deploy.InformaticsGateway.Services.Storage;
@@ -110,10 +111,11 @@ internal static IHostBuilder CreateHostBuilder(string[] args) =>
110111
services.AddSingleton<IApplicationEntityManager, ApplicationEntityManager>();
111112
services.AddSingleton<SpaceReclaimerService>();
112113
services.AddSingleton<ScpService>();
113-
services.AddSingleton<ScuExportService>();
114-
services.AddSingleton<DicomWebExportService>();
114+
//services.AddSingleton<ScuExportService>();
115+
//services.AddSingleton<DicomWebExportService>();
115116
services.AddSingleton<DataRetrievalService>();
116117
services.AddSingleton<PayloadNotificationService>();
118+
services.AddSingleton<MllpService>();
117119

118120
var timeout = TimeSpan.FromSeconds(hostContext.Configuration.GetValue("InformaticsGateway:dicomWeb:clientTimeout", DicomWebConfiguration.DefaultClientTimeout));
119121
services
@@ -133,9 +135,10 @@ internal static IHostBuilder CreateHostBuilder(string[] args) =>
133135
services.AddHostedService<SpaceReclaimerService>(p => p.GetService<SpaceReclaimerService>());
134136
services.AddHostedService<DataRetrievalService>(p => p.GetService<DataRetrievalService>());
135137
services.AddHostedService<ScpService>(p => p.GetService<ScpService>());
136-
services.AddHostedService<ScuExportService>(p => p.GetService<ScuExportService>());
137-
services.AddHostedService<DicomWebExportService>(p => p.GetService<DicomWebExportService>());
138+
//services.AddHostedService<ScuExportService>(p => p.GetService<ScuExportService>());
139+
//services.AddHostedService<DicomWebExportService>(p => p.GetService<DicomWebExportService>());
138140
services.AddHostedService<PayloadNotificationService>(p => p.GetService<PayloadNotificationService>());
141+
services.AddHostedService<MllpService>(p => p.GetService<MllpService>());
139142
})
140143
.ConfigureWebHostDefaults(webBuilder =>
141144
{
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// SPDX-FileCopyrightText: © 2022 MONAI Consortium
2+
// SPDX-License-Identifier: Apache License 2.0
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Net.Sockets;
8+
using System.Text;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Ardalis.GuardClauses;
12+
using HL7.Dotnetcore;
13+
using Microsoft.Extensions.Logging;
14+
using Monai.Deploy.InformaticsGateway.Api;
15+
using Monai.Deploy.InformaticsGateway.Configuration;
16+
using Monai.Deploy.InformaticsGateway.Logging;
17+
18+
namespace Monai.Deploy.InformaticsGateway.Services.HealthLevel7
19+
{
20+
internal sealed class MllpClient
21+
{
22+
private readonly TcpClient _client;
23+
private readonly Hl7Configuration _configurations;
24+
private readonly ILogger<MllpClient> _logger;
25+
private readonly Guid _connectionId;
26+
private readonly List<Exception> _exceptions;
27+
private readonly List<Message> _messages;
28+
29+
public MllpClient(TcpClient client, Hl7Configuration configurations, ILogger<MllpClient> logger)
30+
{
31+
_client = client ?? throw new ArgumentNullException(nameof(client));
32+
_configurations = configurations ?? throw new ArgumentNullException(nameof(configurations));
33+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
34+
35+
_connectionId = Guid.NewGuid();
36+
_exceptions = new List<Exception>();
37+
_messages = new List<Message>();
38+
39+
_logger.BeginScope(new LoggingDataDictionary<string, object> { { "End point", _client.Client.RemoteEndPoint }, { "CorrelationId", _connectionId } });
40+
}
41+
42+
internal async Task Start(Action<TcpClient, MllpClientResult> onDisconnect, CancellationToken cancellationToken)
43+
{
44+
using var clientStream = _client.GetStream();
45+
clientStream.ReadTimeout = _configurations.ClientTimeoutMilliseconds;
46+
clientStream.WriteTimeout = _configurations.ClientTimeoutMilliseconds;
47+
48+
var messages = await ReceiveData(clientStream, cancellationToken).ConfigureAwait(false);
49+
_messages.AddRange(messages);
50+
51+
if (onDisconnect is not null)
52+
{
53+
onDisconnect(_client, new MllpClientResult(_messages, _exceptions.Count > 0 ? new AggregateException(_exceptions) : null));
54+
}
55+
}
56+
57+
private async Task SendAcknowledgment(NetworkStream clientStream, Message message, CancellationToken cancellationToken)
58+
{
59+
Guard.Against.Null(clientStream, nameof(clientStream));
60+
Guard.Against.Null(message, nameof(message));
61+
62+
if (!_configurations.SendAcknowledgment)
63+
{
64+
return;
65+
}
66+
67+
if (ShouldSendAcknowledgment(message))
68+
{
69+
var ackMessage = message.GetACK();
70+
var ackData = new ReadOnlyMemory<byte>(ackMessage.GetMLLP());
71+
try
72+
{
73+
await clientStream.WriteAsync(ackData, cancellationToken).ConfigureAwait(false);
74+
await clientStream.FlushAsync(cancellationToken);
75+
}
76+
catch (Exception ex)
77+
{
78+
_logger.ErrorSendingHl7Acknowledgment(ex);
79+
_exceptions.Add(ex);
80+
}
81+
}
82+
}
83+
84+
private bool ShouldSendAcknowledgment(Message message)
85+
{
86+
Guard.Against.Null(message, nameof(message));
87+
try
88+
{
89+
var value = message.DefaultSegment(Resources.MessageHeaderSegment).Fields(Resources.AcceptAcknowledgementType);
90+
if (value is null)
91+
{
92+
return true;
93+
}
94+
return value.Value switch
95+
{
96+
Resources.AcknowledgmentTypeNever => false,
97+
Resources.AcknowledgmentTypeError => _exceptions.Any(),
98+
Resources.AcknowledgmentTypeSuccessful => !_exceptions.Any(),
99+
_ => true,
100+
};
101+
}
102+
catch (Exception ex)
103+
{
104+
_logger.MissingFieldInHL7Message(Resources.MessageHeaderSegment, Resources.AcceptAcknowledgementType, ex);
105+
_exceptions.Add(ex);
106+
return true;
107+
}
108+
}
109+
110+
private async Task<IList<Message>> ReceiveData(NetworkStream clientStream, CancellationToken cancellationToken)
111+
{
112+
Guard.Against.Null(clientStream, nameof(clientStream));
113+
114+
var messageBuffer = new Memory<byte>(new byte[_configurations.BufferSize]);
115+
int bytesRead;
116+
var data = string.Empty;
117+
var messages = new List<Message>();
118+
119+
while (true)
120+
{
121+
try
122+
{
123+
bytesRead = await clientStream.ReadAsync(messageBuffer, cancellationToken).ConfigureAwait(false);
124+
}
125+
catch (Exception ex)
126+
{
127+
_logger.ExceptionReadingClientStream(ex);
128+
_exceptions.Add(ex);
129+
break;
130+
}
131+
132+
if (bytesRead == 0)
133+
{
134+
break;
135+
}
136+
137+
data += Encoding.UTF8.GetString(messageBuffer.ToArray());
138+
139+
var startIndex = data.IndexOf(Resources.AsciiVT);
140+
if (startIndex >= 0)
141+
{
142+
var endIndex = data.IndexOf(Resources.AsciiFS);
143+
144+
if (endIndex > startIndex)
145+
{
146+
if (!CreateMessage(startIndex, endIndex, ref data, out var message))
147+
{
148+
break;
149+
}
150+
else
151+
{
152+
await SendAcknowledgment(clientStream, message, cancellationToken).ConfigureAwait(false);
153+
messages.Add(message);
154+
}
155+
}
156+
}
157+
}
158+
return messages;
159+
}
160+
161+
private bool CreateMessage(int startIndex, int endIndex, ref string data, out Message message)
162+
{
163+
var messageStartIndex = startIndex + 1;
164+
var messageEndIndex = endIndex + 1;
165+
try
166+
{
167+
message = new Message(data.Substring(messageStartIndex, endIndex - messageStartIndex));
168+
message.ParseMessage();
169+
data = data.Length > endIndex ? data.Substring(messageEndIndex) : string.Empty;
170+
return true;
171+
}
172+
catch (Exception ex)
173+
{
174+
message = null;
175+
_logger.ErrorParsingHl7Message(ex);
176+
_exceptions.Add(ex);
177+
return false;
178+
}
179+
}
180+
}
181+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// SPDX-FileCopyrightText: © 2022 MONAI Consortium
2+
// SPDX-License-Identifier: Apache License 2.0
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using HL7.Dotnetcore;
7+
8+
namespace Monai.Deploy.InformaticsGateway.Services.HealthLevel7
9+
{
10+
internal class MllpClientResult
11+
{
12+
public IList<Message> Messages { get; }
13+
public AggregateException AggregateException { get; }
14+
15+
public MllpClientResult(IList<Message> messages, AggregateException aggregateException)
16+
{
17+
Messages = messages;
18+
AggregateException = aggregateException;
19+
}
20+
21+
}
22+
}

0 commit comments

Comments
 (0)