@@ -4,13 +4,12 @@ import {
4
4
TriggerTaskRequestBody ,
5
5
packetRequiresOffloading ,
6
6
} from "@trigger.dev/core/v3" ;
7
- import { nanoid } from "nanoid" ;
8
7
import { createHash } from "node:crypto" ;
9
- import { $transaction } from "~/db.server" ;
8
+ import { $transaction , prisma } from "~/db.server" ;
10
9
import { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
10
+ import { marqs , sanitizeQueueName } from "~/v3/marqs/index.server" ;
11
11
import { eventRepository } from "../eventRepository.server" ;
12
12
import { generateFriendlyId } from "../friendlyIdentifiers" ;
13
- import { marqs } from "~/v3/marqs/index.server" ;
14
13
import { uploadToObjectStore } from "../r2.server" ;
15
14
import { BaseService } from "./baseService.server" ;
16
15
@@ -112,7 +111,12 @@ export class TriggerTaskService extends BaseService {
112
111
select : { lastNumber : true } ,
113
112
} ) ;
114
113
115
- const queueName = body . options ?. queue ?. name ?? `task/${ taskId } ` ;
114
+ let queueName = sanitizeQueueName ( body . options ?. queue ?. name ?? `task/${ taskId } ` ) ;
115
+
116
+ // Check that the queuename is not an empty string
117
+ if ( ! queueName ) {
118
+ queueName = sanitizeQueueName ( `task/${ taskId } ` ) ;
119
+ }
116
120
117
121
event . setAttribute ( "queueName" , queueName ) ;
118
122
span . setAttribute ( "queueName" , queueName ) ;
@@ -183,6 +187,41 @@ export class TriggerTaskService extends BaseService {
183
187
}
184
188
}
185
189
190
+ if ( body . options ?. queue ) {
191
+ const concurrencyLimit = body . options . queue . concurrencyLimit
192
+ ? Math . max ( 0 , body . options . queue . concurrencyLimit )
193
+ : null ;
194
+ const taskQueue = await prisma . taskQueue . upsert ( {
195
+ where : {
196
+ runtimeEnvironmentId_name : {
197
+ runtimeEnvironmentId : environment . id ,
198
+ name : queueName ,
199
+ } ,
200
+ } ,
201
+ update : {
202
+ concurrencyLimit,
203
+ rateLimit : body . options . queue . rateLimit ,
204
+ } ,
205
+ create : {
206
+ friendlyId : generateFriendlyId ( "queue" ) ,
207
+ name : queueName ,
208
+ concurrencyLimit,
209
+ runtimeEnvironmentId : environment . id ,
210
+ projectId : environment . projectId ,
211
+ rateLimit : body . options . queue . rateLimit ,
212
+ type : "NAMED" ,
213
+ } ,
214
+ } ) ;
215
+
216
+ if ( typeof taskQueue . concurrencyLimit === "number" ) {
217
+ await marqs ?. updateQueueConcurrencyLimits (
218
+ environment ,
219
+ taskQueue . name ,
220
+ taskQueue . concurrencyLimit
221
+ ) ;
222
+ }
223
+ }
224
+
186
225
return taskRun ;
187
226
} ) ;
188
227
0 commit comments