File tree Expand file tree Collapse file tree 3 files changed +27
-0
lines changed Expand file tree Collapse file tree 3 files changed +27
-0
lines changed Original file line number Diff line number Diff line change @@ -507,6 +507,8 @@ export class MarQS {
507
507
} ) ;
508
508
509
509
await this . #callEnqueueMessage( newMessage ) ;
510
+
511
+ await this . options . subscriber ?. messageReplaced ( newMessage ) ;
510
512
} ,
511
513
{
512
514
kind : SpanKind . CONSUMER ,
Original file line number Diff line number Diff line change @@ -97,6 +97,7 @@ export interface MessageQueueSubscriber {
97
97
messageDequeued ( message : MessagePayload ) : Promise < void > ;
98
98
messageAcked ( message : MessagePayload ) : Promise < void > ;
99
99
messageNacked ( message : MessagePayload ) : Promise < void > ;
100
+ messageReplaced ( message : MessagePayload ) : Promise < void > ;
100
101
}
101
102
102
103
export interface VisibilityTimeoutStrategy {
Original file line number Diff line number Diff line change @@ -97,6 +97,30 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
97
97
} ) ;
98
98
}
99
99
100
+ async messageReplaced ( message : MessagePayload ) : Promise < void > {
101
+ logger . debug ( "TaskRunConcurrencyTracker.messageReplaced()" , {
102
+ data : message . data ,
103
+ messageId : message . messageId ,
104
+ } ) ;
105
+
106
+ const data = this . getMessageData ( message ) ;
107
+ if ( ! data ) {
108
+ logger . info (
109
+ `TaskRunConcurrencyTracker.messageReplaced(): could not parse message data` ,
110
+ message
111
+ ) ;
112
+ return ;
113
+ }
114
+
115
+ await this . executionFinished ( {
116
+ projectId : data . projectId ,
117
+ taskId : data . taskIdentifier ,
118
+ runId : message . messageId ,
119
+ environmentId : data . environmentId ,
120
+ deployed : data . environmentType !== "DEVELOPMENT" ,
121
+ } ) ;
122
+ }
123
+
100
124
private getMessageData ( message : MessagePayload ) {
101
125
const result = ConcurrentMessageData . safeParse ( message . data ) ;
102
126
if ( result . success ) {
You can’t perform that action at this time.
0 commit comments