25
25
using Microsoft . Extensions . DependencyInjection ;
26
26
using Microsoft . Extensions . Logging ;
27
27
using Monai . Deploy . InformaticsGateway . Api ;
28
+ using Monai . Deploy . InformaticsGateway . Api . Rest ;
28
29
using Monai . Deploy . InformaticsGateway . Api . Storage ;
29
30
using Monai . Deploy . InformaticsGateway . Database . Api . Repositories ;
30
31
using Monai . Deploy . InformaticsGateway . Logging ;
32
+ using Monai . Deploy . InformaticsGateway . Services . Common ;
31
33
using Monai . Deploy . Messaging . Events ;
32
34
33
35
#nullable enable
@@ -38,7 +40,7 @@ namespace Monai.Deploy.InformaticsGateway.Services.Connectors
38
40
/// An in-memory queue for providing any files/DICOM instances received by the Informatics Gateway to
39
41
/// other internal services.
40
42
/// </summary>
41
- internal sealed partial class PayloadAssembler : IPayloadAssembler , IDisposable
43
+ internal sealed partial class PayloadAssembler : IPayloadAssembler , IDisposable , IMonaiService
42
44
{
43
45
internal const int DEFAULT_TIMEOUT = 5 ;
44
46
private readonly ILogger < PayloadAssembler > _logger ;
@@ -49,13 +51,16 @@ internal sealed partial class PayloadAssembler : IPayloadAssembler, IDisposable
49
51
private readonly Task _intializedTask ;
50
52
private readonly BlockingCollection < Payload > _workItems ;
51
53
private readonly System . Timers . Timer _timer ;
54
+ private readonly IPayloadRepository _repository ;
52
55
53
56
public PayloadAssembler (
54
57
ILogger < PayloadAssembler > logger ,
55
58
IServiceScopeFactory serviceScopeFactory )
56
59
{
57
60
_logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
58
61
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException ( nameof ( serviceScopeFactory ) ) ;
62
+ var scope = _serviceScopeFactory . CreateScope ( ) ;
63
+ _repository = scope . ServiceProvider . GetRequiredService < IPayloadRepository > ( ) ;
59
64
60
65
_workItems = [ ] ;
61
66
_tokenSource = new CancellationTokenSource ( ) ;
@@ -69,15 +74,18 @@ public PayloadAssembler(
69
74
} ;
70
75
_timer . Elapsed += OnTimedEvent ;
71
76
_timer . Enabled = true ;
77
+
78
+ Status = ServiceStatus . Running ;
72
79
}
73
80
81
+ public string ServiceName { get => nameof ( PayloadAssembler ) ; }
82
+
83
+ public ServiceStatus Status { get ; set ; } = ServiceStatus . Unknown ;
84
+
74
85
private async Task RemovePendingPayloads ( )
75
86
{
76
87
_logger . RemovingPendingPayloads ( ) ;
77
- var scope = _serviceScopeFactory . CreateScope ( ) ;
78
- var repository = scope . ServiceProvider . GetRequiredService < IPayloadRepository > ( ) ;
79
-
80
- var removed = await repository . RemovePendingPayloadsAsync ( ) . ConfigureAwait ( false ) ;
88
+ var removed = await _repository . RemovePendingPayloadsAsync ( ) . ConfigureAwait ( false ) ;
81
89
82
90
_logger . TotalNumberOfPayloadsRemoved ( removed ) ;
83
91
}
@@ -192,8 +200,7 @@ private async Task QueueBucketForNotification(string key, Payload payload)
192
200
{
193
201
payload . State = Payload . PayloadState . Move ;
194
202
var scope = _serviceScopeFactory . CreateScope ( ) ;
195
- var repository = scope . ServiceProvider . GetRequiredService < IPayloadRepository > ( ) ;
196
- await repository . UpdateAsync ( payload ) . ConfigureAwait ( false ) ;
203
+ await _repository . UpdateAsync ( payload ) . ConfigureAwait ( false ) ;
197
204
_logger . PayloadSaved ( payload . PayloadId ) ;
198
205
_workItems . Add ( payload ) ;
199
206
_logger . BucketReady ( key , payload . Count ) ;
@@ -219,10 +226,8 @@ private async Task<Payload> CreateOrGetPayload(string key, string correlationId,
219
226
220
227
private async Task < Payload > PayloadFactory ( string key , string correlationId , string ? workflowInstanceId , string ? taskId , Messaging . Events . DataOrigin dataOrigin , uint timeout , CancellationToken cancellationToken )
221
228
{
222
- var scope = _serviceScopeFactory . CreateScope ( ) ;
223
- var repository = scope . ServiceProvider . GetRequiredService < IPayloadRepository > ( ) ;
224
229
var newPayload = new Payload ( key , correlationId , workflowInstanceId , taskId , dataOrigin , timeout , null ) ;
225
- await repository . AddAsync ( newPayload , cancellationToken ) . ConfigureAwait ( false ) ;
230
+ await _repository . AddAsync ( newPayload , cancellationToken ) . ConfigureAwait ( false ) ;
226
231
_logger . BucketCreated ( key , timeout ) ;
227
232
return newPayload ;
228
233
}
@@ -232,6 +237,7 @@ public void Dispose()
232
237
_tokenSource . Cancel ( ) ;
233
238
_payloads . Clear ( ) ;
234
239
_timer . Stop ( ) ;
240
+ Status = ServiceStatus . Stopped ;
235
241
}
236
242
}
237
243
}
0 commit comments