@@ -63,12 +63,12 @@ public abstract class ExportServiceBase : IHostedService, IMonaiService
63
63
private readonly IMessageBrokerSubscriberService _messageSubscriber ;
64
64
private readonly IMessageBrokerPublisherService _messagePublisher ;
65
65
private readonly IServiceScope _scope ;
66
- private readonly Dictionary < string , ExportRequestMessage > _exportRequets ;
66
+ private readonly Dictionary < string , ExportRequestMessage > _exportRequests ;
67
67
private readonly string _exportQueueName ;
68
68
private TransformManyBlock < ExportRequestMessage , ExportRequestDataMessage > _exportFlow ;
69
69
70
70
public abstract string RoutingKey { get ; }
71
- protected abstract int Concurrentcy { get ; }
71
+ protected abstract int Concurrency { get ; }
72
72
public ServiceStatus Status { get ; set ; } = ServiceStatus . Unknown ;
73
73
public abstract string ServiceName { get ; }
74
74
@@ -102,7 +102,7 @@ public ExportServiceBase(
102
102
_messageSubscriber = _scope . ServiceProvider . GetRequiredService < IMessageBrokerSubscriberService > ( ) ;
103
103
_messagePublisher = _scope . ServiceProvider . GetRequiredService < IMessageBrokerPublisherService > ( ) ;
104
104
105
- _exportRequets = new Dictionary < string , ExportRequestMessage > ( ) ;
105
+ _exportRequests = new Dictionary < string , ExportRequestMessage > ( ) ;
106
106
_exportQueueName = _configuration . Messaging . Topics . ExportRequestQueue ;
107
107
}
108
108
@@ -131,7 +131,7 @@ private void SetupExportFlow(CancellationToken cancellationToken)
131
131
{
132
132
var executionOptions = new ExecutionDataflowBlockOptions
133
133
{
134
- MaxDegreeOfParallelism = Concurrentcy ,
134
+ MaxDegreeOfParallelism = Concurrency ,
135
135
MaxMessagesPerTask = 1 ,
136
136
CancellationToken = cancellationToken
137
137
} ;
@@ -184,11 +184,22 @@ private void OnMessageReceivedCallback(MessageReceivedEventArgs eventArgs)
184
184
_messageSubscriber . Reject ( eventArgs . Message ) ;
185
185
return ;
186
186
}
187
- var exportRequest = eventArgs . Message . ConvertTo < ExportRequestMessage > ( ) ;
188
- exportRequest . MessageId = eventArgs . Message . MessageId ;
189
187
190
- _exportRequets . Add ( exportRequest . ExportTaskId , exportRequest ) ;
191
- _exportFlow . Post ( exportRequest ) ;
188
+ lock ( SyncRoot )
189
+ {
190
+ var exportRequest = eventArgs . Message . ConvertTo < ExportRequestMessage > ( ) ;
191
+ if ( _exportRequests . ContainsKey ( exportRequest . ExportTaskId ) )
192
+ {
193
+ _logger . Log ( LogLevel . Warning , $ "The export request { exportRequest . ExportTaskId } is already queued for export.") ;
194
+ return ;
195
+ }
196
+
197
+ exportRequest . MessageId = eventArgs . Message . MessageId ;
198
+ exportRequest . DeliveryTag = eventArgs . Message . DeliveryTag ;
199
+
200
+ _exportRequests . Add ( exportRequest . ExportTaskId , exportRequest ) ;
201
+ _exportFlow . Post ( exportRequest ) ;
202
+ }
192
203
}
193
204
194
205
private IEnumerable < ExportRequestDataMessage > DownloadPayloadActionCallback ( ExportRequestMessage exportRequest , CancellationToken cancellationToken )
@@ -198,31 +209,33 @@ private IEnumerable<ExportRequestDataMessage> DownloadPayloadActionCallback(Expo
198
209
var scope = _serviceScopeFactory . CreateScope ( ) ;
199
210
var storageService = scope . ServiceProvider . GetRequiredService < IStorageService > ( ) ;
200
211
201
- var exportRequestData = new ExportRequestDataMessage ( exportRequest ) ;
202
212
foreach ( var file in exportRequest . Files )
203
213
{
214
+ var exportRequestData = new ExportRequestDataMessage ( exportRequest , file ) ;
204
215
try
205
216
{
206
217
_logger . Log ( LogLevel . Debug , $ "Downloading file { file } ...") ;
207
218
Policy
208
- . Handle < Exception > ( )
209
- . WaitAndRetry (
210
- _configuration . Export . Retries . RetryDelays ,
211
- ( exception , timeSpan , retryCount , context ) =>
212
- {
213
- _logger . Log ( LogLevel . Error , exception , $ "Error downloading payload. Waiting { timeSpan } before next retry. Retry attempt { retryCount } .") ;
214
- } )
215
- . Execute ( ( ) =>
216
- {
217
- storageService . GetObject ( _configuration . Storage . StorageServiceBucketName , file , ( stream ) =>
218
- {
219
- _logger . Log ( LogLevel . Debug , $ "Copying file { file } ...") ;
220
- using var memoryStream = new MemoryStream ( ) ;
221
- stream . CopyTo ( memoryStream ) ;
222
- exportRequestData . SetData ( memoryStream . ToArray ( ) ) ;
223
- _logger . Log ( LogLevel . Debug , $ "File { file } ready for export...") ;
224
- } , cancellationToken ) . Wait ( ) ;
225
- } ) ;
219
+ . Handle < Exception > ( )
220
+ . WaitAndRetry (
221
+ _configuration . Export . Retries . RetryDelays ,
222
+ ( exception , timeSpan , retryCount , context ) =>
223
+ {
224
+ _logger . Log ( LogLevel . Error , exception , $ "Error downloading payload. Waiting { timeSpan } before next retry. Retry attempt { retryCount } .") ;
225
+ } )
226
+ . Execute ( ( ) =>
227
+ {
228
+ _logger . Log ( LogLevel . Debug , $ "Downloading { file } ...") ;
229
+ var task = storageService . GetObject ( _configuration . Storage . StorageServiceBucketName , file , ( stream ) =>
230
+ {
231
+ using var memoryStream = new MemoryStream ( ) ;
232
+ stream . CopyTo ( memoryStream ) ;
233
+ exportRequestData . SetData ( memoryStream . ToArray ( ) ) ;
234
+ } , cancellationToken ) ;
235
+
236
+ task . Wait ( ) ;
237
+ _logger . Log ( LogLevel . Debug , $ "File { file } ready for export.") ;
238
+ } ) ;
226
239
}
227
240
catch ( Exception ex )
228
241
{
@@ -239,7 +252,7 @@ private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)
239
252
{
240
253
using var loggerScope = _logger . BeginScope ( new LoggingDataDictionary < string , object > { { "ExportTaskId" , exportRequestData . ExportTaskId } , { "CorrelationId" , exportRequestData . CorrelationId } } ) ;
241
254
242
- var exportRequest = _exportRequets [ exportRequestData . ExportTaskId ] ;
255
+ var exportRequest = _exportRequests [ exportRequestData . ExportTaskId ] ;
243
256
lock ( SyncRoot )
244
257
{
245
258
if ( exportRequestData . IsFailed )
@@ -300,6 +313,11 @@ private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)
300
313
_logger . Log ( LogLevel . Information , $ "Publishing export complete message.") ;
301
314
_messagePublisher . Publish ( _configuration . Messaging . Topics . ExportComplete , jsonMessage . ToMessage ( ) ) ;
302
315
} ) ;
316
+
317
+ lock ( SyncRoot )
318
+ {
319
+ _exportRequests . Remove ( exportRequestData . ExportTaskId ) ;
320
+ }
303
321
}
304
322
}
305
323
}
0 commit comments