Skip to content

Commit b10600e

Browse files
committed
gh-56 fix export pipeline stuck after Transform block
1 parent 50245ab commit b10600e

File tree

7 files changed

+77
-86
lines changed

7 files changed

+77
-86
lines changed

src/Configuration/MessageBrokerConfigurationKeys.cs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,5 @@ public class MessageBrokerConfigurationKeys
3535
/// </summary>
3636
[JsonProperty(PropertyName = "exportRequestPrefix")]
3737
public string ExportRequestPrefix { get; set; } = "md.export.request";
38-
39-
/// <summary>
40-
/// Gets or sets the name of the request queue.
41-
/// Defaults to `export_tasks`
42-
/// </summary>
43-
[JsonProperty(PropertyName = "exportRequestQueue")]
44-
public string ExportRequestQueue { get; set; } = "export_tasks";
4538
}
4639
}

src/InformaticsGateway/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public static IHostBuilder CreateHostBuilder(string[] args) =>
9595
services.AddTransient<IDicomToolkit, DicomToolkit>();
9696

9797
services.AddScoped(typeof(IInformaticsGatewayRepository<>), typeof(InformaticsGatewayRepository<>));
98+
services.AddScoped<IInferenceRequestRepository, InferenceRequestRepository>();
9899

99100
services.AddSingleton<MinIoStorageService>();
100101
services.AddSingleton<IStorageService>(implementationFactory =>

src/InformaticsGateway/Services/Export/ExportServiceBase.cs

Lines changed: 47 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ public abstract class ExportServiceBase : IHostedService, IMonaiService
5454
{
5555
private static readonly object SyncRoot = new();
5656

57-
internal event EventHandler ReportActionStarted;
57+
internal event EventHandler ReportActionCompleted;
5858

59+
private readonly CancellationTokenSource _cancellationTokenSource;
5960
private readonly ILogger _logger;
6061
private readonly IServiceScopeFactory _serviceScopeFactory;
6162
private readonly IStorageInfoProvider _storageInfoProvider;
@@ -64,8 +65,6 @@ public abstract class ExportServiceBase : IHostedService, IMonaiService
6465
private readonly IMessageBrokerPublisherService _messagePublisher;
6566
private readonly IServiceScope _scope;
6667
private readonly Dictionary<string, ExportRequestMessage> _exportRequests;
67-
private readonly string _exportQueueName;
68-
private TransformManyBlock<ExportRequestMessage, ExportRequestDataMessage> _exportFlow;
6968

7069
public abstract string RoutingKey { get; }
7170
protected abstract int Concurrency { get; }
@@ -87,6 +86,7 @@ public ExportServiceBase(
8786
IServiceScopeFactory serviceScopeFactory,
8887
IStorageInfoProvider storageInfoProvider)
8988
{
89+
_cancellationTokenSource = new CancellationTokenSource();
9090
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
9191
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
9292
_scope = _serviceScopeFactory.CreateScope();
@@ -103,12 +103,10 @@ public ExportServiceBase(
103103
_messagePublisher = _scope.ServiceProvider.GetRequiredService<IMessageBrokerPublisherService>();
104104

105105
_exportRequests = new Dictionary<string, ExportRequestMessage>();
106-
_exportQueueName = _configuration.Messaging.Topics.ExportRequestQueue;
107106
}
108107

109108
public Task StartAsync(CancellationToken cancellationToken)
110109
{
111-
SetupExportFlow(cancellationToken);
112110
SetupPolling();
113111

114112
Status = ServiceStatus.Running;
@@ -118,44 +116,73 @@ public Task StartAsync(CancellationToken cancellationToken)
118116

119117
public Task StopAsync(CancellationToken cancellationToken)
120118
{
119+
_cancellationTokenSource.Cancel();
121120
_logger.LogInformation($"{ServiceName} is stopping.");
122-
_exportFlow.Complete();
123-
_exportFlow.Completion.Wait(cancellationToken);
124121
Status = ServiceStatus.Stopped;
125122
return Task.CompletedTask;
126123
}
127124

128-
private void SetupExportFlow(CancellationToken cancellationToken)
125+
private void SetupPolling()
129126
{
127+
_messageSubscriber.Subscribe(RoutingKey, String.Empty, OnMessageReceivedCallback);
128+
_logger.Log(LogLevel.Information, $"{ServiceName} subscribed to {RoutingKey} messages.");
129+
}
130+
131+
private void OnMessageReceivedCallback(MessageReceivedEventArgs eventArgs)
132+
{
133+
if (!_storageInfoProvider.HasSpaceAvailableForExport)
134+
{
135+
_logger.Log(LogLevel.Warning, $"Export service paused due to insufficient storage space. Available storage space: {_storageInfoProvider.AvailableFreeSpace:D}.");
136+
_messageSubscriber.Reject(eventArgs.Message);
137+
return;
138+
}
139+
130140
try
131141
{
132142
var executionOptions = new ExecutionDataflowBlockOptions
133143
{
134144
MaxDegreeOfParallelism = Concurrency,
135145
MaxMessagesPerTask = 1,
136-
CancellationToken = cancellationToken
146+
CancellationToken = _cancellationTokenSource.Token
137147
};
138148

139-
_exportFlow = new TransformManyBlock<ExportRequestMessage, ExportRequestDataMessage>(
140-
(exportRequest) => DownloadPayloadActionCallback(exportRequest, cancellationToken),
149+
var exportFlow = new TransformManyBlock<ExportRequestMessage, ExportRequestDataMessage>(
150+
(exportRequest) => DownloadPayloadActionCallback(exportRequest, _cancellationTokenSource.Token),
141151
executionOptions);
142152

143153
var exportActionBlock = new TransformBlock<ExportRequestDataMessage, ExportRequestDataMessage>(
144154
async (exportDataRequest) =>
145155
{
146156
if (exportDataRequest.IsFailed) return exportDataRequest;
147-
return await ExportDataBlockCallback(exportDataRequest, cancellationToken);
157+
return await ExportDataBlockCallback(exportDataRequest, _cancellationTokenSource.Token);
148158
},
149159
executionOptions);
150160

151161
var reportingActionBlock = new ActionBlock<ExportRequestDataMessage>(ReportingActionBlock, executionOptions);
152162

153163
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
154164

155-
_exportFlow.LinkTo(exportActionBlock, linkOptions);
165+
exportFlow.LinkTo(exportActionBlock, linkOptions);
156166
exportActionBlock.LinkTo(reportingActionBlock, linkOptions);
157167

158-
_logger.Log(LogLevel.Information, $"{ServiceName} completed workflow setup.");
168+
lock (SyncRoot)
169+
{
170+
var exportRequest = eventArgs.Message.ConvertTo<ExportRequestMessage>();
171+
if (_exportRequests.ContainsKey(exportRequest.ExportTaskId))
172+
{
173+
_logger.Log(LogLevel.Warning, $"The export request {exportRequest.ExportTaskId} is already queued for export.");
174+
return;
175+
}
176+
177+
exportRequest.MessageId = eventArgs.Message.MessageId;
178+
exportRequest.DeliveryTag = eventArgs.Message.DeliveryTag;
179+
180+
_exportRequests.Add(exportRequest.ExportTaskId, exportRequest);
181+
exportFlow.Post(exportRequest);
182+
}
183+
184+
exportFlow.Complete();
185+
reportingActionBlock.Completion.Wait(_cancellationTokenSource.Token);
159186
}
160187
catch (AggregateException ex)
161188
{
@@ -170,38 +197,6 @@ private void SetupExportFlow(CancellationToken cancellationToken)
170197
}
171198
}
172199

173-
private void SetupPolling()
174-
{
175-
_messageSubscriber.Subscribe(RoutingKey, _exportQueueName, OnMessageReceivedCallback);
176-
_logger.Log(LogLevel.Information, $"{ServiceName} subscribed to {RoutingKey} messages.");
177-
}
178-
179-
private void OnMessageReceivedCallback(MessageReceivedEventArgs eventArgs)
180-
{
181-
if (!_storageInfoProvider.HasSpaceAvailableForExport)
182-
{
183-
_logger.Log(LogLevel.Warning, $"Export service paused due to insufficient storage space. Available storage space: {_storageInfoProvider.AvailableFreeSpace:D}.");
184-
_messageSubscriber.Reject(eventArgs.Message);
185-
return;
186-
}
187-
188-
lock (SyncRoot)
189-
{
190-
var exportRequest = eventArgs.Message.ConvertTo<ExportRequestMessage>();
191-
if (_exportRequests.ContainsKey(exportRequest.ExportTaskId))
192-
{
193-
_logger.Log(LogLevel.Warning, $"The export request {exportRequest.ExportTaskId} is already queued for export.");
194-
return;
195-
}
196-
197-
exportRequest.MessageId = eventArgs.Message.MessageId;
198-
exportRequest.DeliveryTag = eventArgs.Message.DeliveryTag;
199-
200-
_exportRequests.Add(exportRequest.ExportTaskId, exportRequest);
201-
_exportFlow.Post(exportRequest);
202-
}
203-
}
204-
205200
private IEnumerable<ExportRequestDataMessage> DownloadPayloadActionCallback(ExportRequestMessage exportRequest, CancellationToken cancellationToken)
206201
{
207202
Guard.Against.Null(exportRequest, nameof(exportRequest));
@@ -277,12 +272,6 @@ private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)
277272

278273
_logger.Log(LogLevel.Information, $"Export task completed with {exportRequest.FailedFiles} failures out of {exportRequest.Files.Count()}");
279274

280-
if (ReportActionStarted != null)
281-
{
282-
_logger.Log(LogLevel.Debug, $"Calling ReportActionStarted callback.");
283-
ReportActionStarted(this, null);
284-
}
285-
286275
var exportCompleteMessage = new ExportCompleteMessage(exportRequest);
287276
var jsonMessage = new JsonMessage<ExportCompleteMessage>(exportCompleteMessage, exportRequest.CorrelationId, exportRequest.DeliveryTag);
288277

@@ -318,6 +307,12 @@ private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)
318307
{
319308
_exportRequests.Remove(exportRequestData.ExportTaskId);
320309
}
310+
311+
if (ReportActionCompleted != null)
312+
{
313+
_logger.Log(LogLevel.Debug, $"Calling ReportActionCompleted callback.");
314+
ReportActionCompleted(this, null);
315+
}
321316
}
322317
}
323318
}

src/InformaticsGateway/Test/Services/Export/DicomWebExportServiceTest.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public async Task ExportDataBlockCallback_ReturnsNullIfInferenceRequestCannotBeF
143143
_dicomToolkit.Object);
144144

145145
var dataflowCompleted = new ManualResetEvent(false);
146-
service.ReportActionStarted += (sender, args) =>
146+
service.ReportActionCompleted += (sender, args) =>
147147
{
148148
dataflowCompleted.Set();
149149
};
@@ -203,7 +203,7 @@ public async Task ExportDataBlockCallback_ReturnsNullIfInferenceRequestContainsN
203203
_dicomToolkit.Object);
204204

205205
var dataflowCompleted = new ManualResetEvent(false);
206-
service.ReportActionStarted += (sender, args) =>
206+
service.ReportActionCompleted += (sender, args) =>
207207
{
208208
dataflowCompleted.Set();
209209
};
@@ -288,7 +288,7 @@ public async Task ExportDataBlockCallback_RecordsStowFailuresAndReportFailure()
288288
_dicomToolkit.Object);
289289

290290
var dataflowCompleted = new ManualResetEvent(false);
291-
service.ReportActionStarted += (sender, args) =>
291+
service.ReportActionCompleted += (sender, args) =>
292292
{
293293
dataflowCompleted.Set();
294294
};
@@ -382,7 +382,7 @@ public async Task CompletesDataflow(HttpStatusCode httpStatusCode)
382382
_dicomToolkit.Object);
383383

384384
var dataflowCompleted = new ManualResetEvent(false);
385-
service.ReportActionStarted += (sender, args) =>
385+
service.ReportActionCompleted += (sender, args) =>
386386
{
387387
dataflowCompleted.Set();
388388
};

src/InformaticsGateway/Test/Services/Export/ExportServiceBaseTest.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ public async Task DataflowTest_StartStop()
109109
await StopAndVerify(service);
110110

111111
_logger.VerifyLogging($"{service.ServiceName} subscribed to {service.RoutingKey} messages.", LogLevel.Information, Times.Once());
112-
_logger.VerifyLogging($"{service.ServiceName} completed workflow setup.", LogLevel.Information, Times.Once());
113112
_logger.VerifyLogging($"{service.ServiceName} is stopping.", LogLevel.Information, Times.Once());
114113
}
115114

@@ -167,7 +166,7 @@ public async Task DataflowTest_PayloadDownlaodFailure()
167166
});
168167
var countdownEvent = new CountdownEvent(1);
169168
var service = new TestExportService(_logger.Object, _configuration, _serviceScopeFactory.Object, _storageInfoProvider.Object);
170-
service.ReportActionStarted += (sender, e) =>
169+
service.ReportActionCompleted += (sender, e) =>
171170
{
172171
countdownEvent.Signal();
173172
};
@@ -186,9 +185,10 @@ public async Task DataflowTest_PayloadDownlaodFailure()
186185
It.IsAny<ushort>()), Times.Once());
187186
}
188187

189-
[RetryFact(10, 10, DisplayName = "Data flow test - end to end workflow")]
188+
[RetryFact(1, 10, DisplayName = "Data flow test - end to end workflow")]
190189
public async Task DataflowTest_EndToEnd()
191190
{
191+
var messageCount = 5;
192192
var testData = "this is a test";
193193
_storageInfoProvider.Setup(p => p.HasSpaceAvailableForExport).Returns(true);
194194

@@ -202,17 +202,20 @@ public async Task DataflowTest_EndToEnd()
202202
It.IsAny<ushort>()))
203203
.Callback<string, string, Action<MessageReceivedEventArgs>, ushort>((topic, queue, messageReceivedCallback, prefetchCount) =>
204204
{
205-
messageReceivedCallback(CreateMessageReceivedEventArgs());
205+
while (messageCount-- > 0)
206+
{
207+
messageReceivedCallback(CreateMessageReceivedEventArgs());
208+
}
206209
});
207210

208211
_storageService.Setup(p => p.GetObject(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Action<Stream>>(), It.IsAny<CancellationToken>()))
209212
.Callback<string, string, Action<Stream>, CancellationToken>((bucketName, objectName, callback, cancellationToken) =>
210213
{
211214
callback(new MemoryStream(Encoding.UTF8.GetBytes(testData)));
212215
});
213-
var countdownEvent = new CountdownEvent(3);
216+
var countdownEvent = new CountdownEvent(5 * 3);
214217
var service = new TestExportService(_logger.Object, _configuration, _serviceScopeFactory.Object, _storageInfoProvider.Object);
215-
service.ReportActionStarted += (sender, e) =>
218+
service.ReportActionCompleted += (sender, e) =>
216219
{
217220
countdownEvent.Signal();
218221
};
@@ -223,13 +226,13 @@ public async Task DataflowTest_EndToEnd()
223226
countdownEvent.Signal();
224227
};
225228
await service.StartAsync(_cancellationTokenSource.Token);
226-
Assert.True(countdownEvent.Wait(3000));
229+
Assert.True(countdownEvent.Wait(1000000));
227230
await StopAndVerify(service);
228231

229232
_messagePublisherService.Verify(
230233
p => p.Publish(It.IsAny<string>(),
231-
It.Is<Message>(match => (match.ConvertTo<ExportCompleteMessage>()).Status == ExportStatus.Success)), Times.Once());
232-
_messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny<MessageBase>()), Times.Once());
234+
It.Is<Message>(match => (match.ConvertTo<ExportCompleteMessage>()).Status == ExportStatus.Success)), Times.Exactly(5));
235+
_messageSubscriberService.Verify(p => p.Acknowledge(It.IsAny<MessageBase>()), Times.Exactly(5));
233236
_messageSubscriberService.Verify(p => p.Reject(It.IsAny<MessageBase>()), Times.Never());
234237
_messageSubscriberService.Verify(p => p.Subscribe(It.IsAny<string>(),
235238
It.IsAny<string>(),

src/InformaticsGateway/Test/Services/Export/ScuExportServiceTest.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public async Task ShallFailWhenNoDestinationIsDefined()
129129
var service = new ScuExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _storageInfoProvider.Object, _dicomToolkit.Object);
130130

131131
var dataflowCompleted = new ManualResetEvent(false);
132-
service.ReportActionStarted += (sender, args) =>
132+
service.ReportActionCompleted += (sender, args) =>
133133
{
134134
dataflowCompleted.Set();
135135
};
@@ -179,7 +179,7 @@ public async Task ShallFailWhenDestinationIsNotConfigured()
179179
var service = new ScuExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _storageInfoProvider.Object, _dicomToolkit.Object);
180180

181181
var dataflowCompleted = new ManualResetEvent(false);
182-
service.ReportActionStarted += (sender, args) =>
182+
service.ReportActionCompleted += (sender, args) =>
183183
{
184184
dataflowCompleted.Set();
185185
};
@@ -233,7 +233,7 @@ public async Task AssociationRejected()
233233
var service = new ScuExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _storageInfoProvider.Object, _dicomToolkit.Object);
234234

235235
var dataflowCompleted = new ManualResetEvent(false);
236-
service.ReportActionStarted += (sender, args) =>
236+
service.ReportActionCompleted += (sender, args) =>
237237
{
238238
dataflowCompleted.Set();
239239
};
@@ -289,7 +289,7 @@ public async Task SimulateAbort()
289289
_dicomToolkit.Setup(p => p.Load(It.IsAny<byte[]>())).Returns(InstanceGenerator.GenerateDicomFile(sopInstanceUid: sopInstanceUid));
290290

291291
var dataflowCompleted = new ManualResetEvent(false);
292-
service.ReportActionStarted += (sender, args) =>
292+
service.ReportActionCompleted += (sender, args) =>
293293
{
294294
dataflowCompleted.Set();
295295
};
@@ -345,7 +345,7 @@ public async Task CStoreFailure()
345345
_dicomToolkit.Setup(p => p.Load(It.IsAny<byte[]>())).Returns(InstanceGenerator.GenerateDicomFile(sopInstanceUid: sopInstanceUid));
346346

347347
var dataflowCompleted = new ManualResetEvent(false);
348-
service.ReportActionStarted += (sender, args) =>
348+
service.ReportActionCompleted += (sender, args) =>
349349
{
350350
dataflowCompleted.Set();
351351
};
@@ -401,7 +401,7 @@ public async Task ErrorLoadingDicomContent()
401401
_dicomToolkit.Setup(p => p.Load(It.IsAny<byte[]>())).Throws(new Exception("error"));
402402

403403
var dataflowCompleted = new ManualResetEvent(false);
404-
service.ReportActionStarted += (sender, args) =>
404+
service.ReportActionCompleted += (sender, args) =>
405405
{
406406
dataflowCompleted.Set();
407407
};
@@ -457,7 +457,7 @@ public async Task UnreachableServer()
457457
_dicomToolkit.Setup(p => p.Load(It.IsAny<byte[]>())).Returns(InstanceGenerator.GenerateDicomFile(sopInstanceUid: sopInstanceUid));
458458

459459
var dataflowCompleted = new ManualResetEvent(false);
460-
service.ReportActionStarted += (sender, args) =>
460+
service.ReportActionCompleted += (sender, args) =>
461461
{
462462
dataflowCompleted.Set();
463463
};
@@ -512,7 +512,7 @@ public async Task ExportCompletes()
512512
_dicomToolkit.Setup(p => p.Load(It.IsAny<byte[]>())).Returns(InstanceGenerator.GenerateDicomFile(sopInstanceUid: sopInstanceUid));
513513

514514
var dataflowCompleted = new ManualResetEvent(false);
515-
service.ReportActionStarted += (sender, args) =>
515+
service.ReportActionCompleted += (sender, args) =>
516516
{
517517
dataflowCompleted.Set();
518518
};

0 commit comments

Comments
 (0)