@@ -4,12 +4,13 @@ import {
4
4
TaskRunExecution ,
5
5
TaskRunExecutionResult ,
6
6
} from "@trigger.dev/core/v3" ;
7
- import { $transaction } from "~/db.server" ;
7
+ import { $transaction , PrismaClientOrTransaction } from "~/db.server" ;
8
8
import { logger } from "~/services/logger.server" ;
9
9
import { marqs } from "~/v3/marqs/index.server" ;
10
10
import { socketIo } from "../handleSocketIo.server" ;
11
11
import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server" ;
12
12
import { BaseService } from "./baseService.server" ;
13
+ import { TaskRunAttempt } from "@trigger.dev/database" ;
13
14
14
15
export class ResumeAttemptService extends BaseService {
15
16
public async call (
@@ -24,7 +25,7 @@ export class ResumeAttemptService extends BaseService {
24
25
} ,
25
26
include : {
26
27
taskRun : true ,
27
- taskRunDependency : {
28
+ dependencies : {
28
29
include : {
29
30
taskRun : {
30
31
include : {
@@ -40,8 +41,12 @@ export class ResumeAttemptService extends BaseService {
40
41
} ,
41
42
} ,
42
43
} ,
44
+ orderBy : {
45
+ createdAt : "desc" ,
46
+ } ,
47
+ take : 1 ,
43
48
} ,
44
- batchTaskRunDependency : {
49
+ batchDependencies : {
45
50
include : {
46
51
items : {
47
52
include : {
@@ -61,6 +66,10 @@ export class ResumeAttemptService extends BaseService {
61
66
} ,
62
67
} ,
63
68
} ,
69
+ orderBy : {
70
+ createdAt : "desc" ,
71
+ } ,
72
+ take : 1 ,
64
73
} ,
65
74
} ,
66
75
} ) ;
@@ -78,6 +87,8 @@ export class ResumeAttemptService extends BaseService {
78
87
return ;
79
88
}
80
89
90
+ let completedAttemptIds : string [ ] = [ ] ;
91
+
81
92
switch ( params . type ) {
82
93
case "WAIT_FOR_DURATION" : {
83
94
logger . error (
@@ -93,148 +104,140 @@ export class ResumeAttemptService extends BaseService {
93
104
} ) ;
94
105
break ;
95
106
}
96
- case "WAIT_FOR_TASK" :
97
- case "WAIT_FOR_BATCH" : {
98
- let completedAttemptIds : string [ ] = [ ] ;
99
-
100
- if ( attempt . taskRunDependency ) {
101
- const dependentAttempt = attempt . taskRunDependency . taskRun . attempts [ 0 ] ;
107
+ case "WAIT_FOR_TASK" : {
108
+ if ( attempt . dependencies . length ) {
109
+ // We only care about the latest dependency
110
+ const dependentAttempt = attempt . dependencies [ 0 ] . taskRun . attempts [ 0 ] ;
102
111
103
112
if ( ! dependentAttempt ) {
104
113
logger . error ( "No dependent attempt" , { attemptId : attempt . id } ) ;
105
114
return ;
106
115
}
107
116
108
117
completedAttemptIds = [ dependentAttempt . id ] ;
109
-
110
- await tx . taskRunAttempt . update ( {
111
- where : {
112
- id : attempt . id ,
113
- } ,
114
- data : {
115
- taskRunDependency : {
116
- disconnect : true ,
117
- } ,
118
- } ,
119
- } ) ;
120
- } else if ( attempt . batchTaskRunDependency ) {
121
- const dependentBatchItems = attempt . batchTaskRunDependency . items ;
118
+ } else {
119
+ logger . error ( "No task dependency" , { attemptId : attempt . id } ) ;
120
+ return ;
121
+ }
122
+ break ;
123
+ }
124
+ case "WAIT_FOR_BATCH" : {
125
+ if ( attempt . batchDependencies ) {
126
+ // We only care about the latest batch dependency
127
+ const dependentBatchItems = attempt . batchDependencies [ 0 ] . items ;
122
128
123
129
if ( ! dependentBatchItems ) {
124
130
logger . error ( "No dependent batch items" , { attemptId : attempt . id } ) ;
125
131
return ;
126
132
}
127
133
128
134
completedAttemptIds = dependentBatchItems . map ( ( item ) => item . taskRun . attempts [ 0 ] ?. id ) ;
129
-
130
- await tx . taskRunAttempt . update ( {
131
- where : {
132
- id : attempt . id ,
133
- } ,
134
- data : {
135
- batchTaskRunDependency : {
136
- disconnect : true ,
137
- } ,
138
- } ,
139
- } ) ;
140
135
} else {
141
- logger . error ( "No dependencies" , { attemptId : attempt . id } ) ;
142
- return ;
143
- }
144
-
145
- if ( completedAttemptIds . length === 0 ) {
146
- logger . error ( "No completed attempt IDs" , { attemptId : attempt . id } ) ;
136
+ logger . error ( "No batch dependency" , { attemptId : attempt . id } ) ;
147
137
return ;
148
138
}
139
+ break ;
140
+ }
141
+ default : {
142
+ break ;
143
+ }
144
+ }
149
145
150
- const completions : TaskRunExecutionResult [ ] = [ ] ;
151
- const executions : TaskRunExecution [ ] = [ ] ;
146
+ await this . #handleDependencyResume( attempt , completedAttemptIds , tx ) ;
147
+ } ) ;
148
+ }
152
149
153
- for ( const completedAttemptId of completedAttemptIds ) {
154
- const completedAttempt = await tx . taskRunAttempt . findUnique ( {
155
- where : {
156
- id : completedAttemptId ,
157
- taskRun : {
158
- lockedAt : {
159
- not : null ,
160
- } ,
161
- lockedById : {
162
- not : null ,
163
- } ,
164
- } ,
165
- } ,
166
- } ) ;
167
-
168
- if ( ! completedAttempt ) {
169
- logger . error ( "Completed attempt not found" , {
170
- attemptId : attempt . id ,
171
- completedAttemptId,
172
- } ) ;
173
- await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
174
- return ;
175
- }
150
+ async #handleDependencyResume(
151
+ attempt : TaskRunAttempt ,
152
+ completedAttemptIds : string [ ] ,
153
+ tx : PrismaClientOrTransaction
154
+ ) {
155
+ if ( completedAttemptIds . length === 0 ) {
156
+ logger . error ( "No completed attempt IDs" , { attemptId : attempt . id } ) ;
157
+ return ;
158
+ }
159
+
160
+ const completions : TaskRunExecutionResult [ ] = [ ] ;
161
+ const executions : TaskRunExecution [ ] = [ ] ;
162
+
163
+ for ( const completedAttemptId of completedAttemptIds ) {
164
+ const completedAttempt = await tx . taskRunAttempt . findUnique ( {
165
+ where : {
166
+ id : completedAttemptId ,
167
+ taskRun : {
168
+ lockedAt : {
169
+ not : null ,
170
+ } ,
171
+ lockedById : {
172
+ not : null ,
173
+ } ,
174
+ } ,
175
+ } ,
176
+ } ) ;
176
177
177
- const completion = await sharedQueueTasks . getCompletionPayloadFromAttempt (
178
- completedAttempt . id
179
- ) ;
178
+ if ( ! completedAttempt ) {
179
+ logger . error ( "Completed attempt not found" , {
180
+ attemptId : attempt . id ,
181
+ completedAttemptId,
182
+ } ) ;
183
+ await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
184
+ return ;
185
+ }
180
186
181
- if ( ! completion ) {
182
- logger . error ( "Failed to get completion payload" , {
183
- attemptId : attempt . id ,
184
- completedAttemptId,
185
- } ) ;
186
- await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
187
- return ;
188
- }
187
+ const completion = await sharedQueueTasks . getCompletionPayloadFromAttempt (
188
+ completedAttempt . id
189
+ ) ;
189
190
190
- completions . push ( completion ) ;
191
+ if ( ! completion ) {
192
+ logger . error ( "Failed to get completion payload" , {
193
+ attemptId : attempt . id ,
194
+ completedAttemptId,
195
+ } ) ;
196
+ await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
197
+ return ;
198
+ }
191
199
192
- const executionPayload = await sharedQueueTasks . getExecutionPayloadFromAttempt (
193
- completedAttempt . id
194
- ) ;
200
+ completions . push ( completion ) ;
195
201
196
- if ( ! executionPayload ) {
197
- logger . error ( "Failed to get execution payload" , {
198
- attemptId : attempt . id ,
199
- completedAttemptId,
200
- } ) ;
201
- await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
202
- return ;
203
- }
202
+ const executionPayload = await sharedQueueTasks . getExecutionPayloadFromAttempt (
203
+ completedAttempt . id
204
+ ) ;
204
205
205
- executions . push ( executionPayload . execution ) ;
206
- }
206
+ if ( ! executionPayload ) {
207
+ logger . error ( "Failed to get execution payload" , {
208
+ attemptId : attempt . id ,
209
+ completedAttemptId,
210
+ } ) ;
211
+ await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
212
+ return ;
213
+ }
207
214
208
- const updated = await tx . taskRunAttempt . update ( {
209
- where : {
210
- id : attempt . id ,
211
- } ,
215
+ executions . push ( executionPayload . execution ) ;
216
+ }
217
+
218
+ const updated = await tx . taskRunAttempt . update ( {
219
+ where : {
220
+ id : attempt . id ,
221
+ } ,
222
+ data : {
223
+ status : "EXECUTING" ,
224
+ taskRun : {
225
+ update : {
212
226
data : {
213
- status : "EXECUTING" ,
214
- taskRun : {
215
- update : {
216
- data : {
217
- status : attempt . number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING" ,
218
- } ,
219
- } ,
220
- } ,
227
+ status : attempt . number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING" ,
221
228
} ,
222
- } ) ;
229
+ } ,
230
+ } ,
231
+ } ,
232
+ } ) ;
223
233
224
- socketIo . coordinatorNamespace . emit ( "RESUME_AFTER_DEPENDENCY" , {
225
- version : "v1" ,
226
- runId : attempt . taskRunId ,
227
- attemptId : attempt . id ,
228
- attemptFriendlyId : attempt . friendlyId ,
229
- completions,
230
- executions,
231
- } ) ;
232
- break ;
233
- }
234
- default : {
235
- break ;
236
- }
237
- }
234
+ socketIo . coordinatorNamespace . emit ( "RESUME_AFTER_DEPENDENCY" , {
235
+ version : "v1" ,
236
+ runId : attempt . taskRunId ,
237
+ attemptId : attempt . id ,
238
+ attemptFriendlyId : attempt . friendlyId ,
239
+ completions,
240
+ executions,
238
241
} ) ;
239
242
}
240
243
}
0 commit comments