@@ -9,174 +9,172 @@ import { expect } from "vitest";
9
9
import { RunEngine } from "../index.js" ;
10
10
import { setTimeout } from "node:timers/promises" ;
11
11
12
+ vi . setConfig ( { testTimeout : 60_000 } ) ;
13
+
12
14
describe ( "RunEngine batchTrigger" , ( ) => {
13
- containerTest (
14
- "Batch trigger shares a batch" ,
15
- { timeout : 15_000 } ,
16
- async ( { prisma, redisOptions } ) => {
17
- //create environment
18
- const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
19
-
20
- const engine = new RunEngine ( {
21
- prisma,
22
- worker : {
23
- redis : redisOptions ,
24
- workers : 1 ,
25
- tasksPerWorker : 10 ,
26
- pollIntervalMs : 100 ,
27
- } ,
28
- queue : {
29
- redis : redisOptions ,
30
- } ,
31
- runLock : {
32
- redis : redisOptions ,
33
- } ,
15
+ containerTest ( "Batch trigger shares a batch" , async ( { prisma, redisOptions } ) => {
16
+ //create environment
17
+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
18
+
19
+ const engine = new RunEngine ( {
20
+ prisma,
21
+ worker : {
22
+ redis : redisOptions ,
23
+ workers : 1 ,
24
+ tasksPerWorker : 10 ,
25
+ pollIntervalMs : 100 ,
26
+ } ,
27
+ queue : {
28
+ redis : redisOptions ,
29
+ } ,
30
+ runLock : {
31
+ redis : redisOptions ,
32
+ } ,
33
+ machines : {
34
+ defaultMachine : "small-1x" ,
34
35
machines : {
35
- defaultMachine : "small-1x" ,
36
- machines : {
37
- "small-1x" : {
38
- name : "small-1x" as const ,
39
- cpu : 0.5 ,
40
- memory : 0.5 ,
41
- centsPerMs : 0.0001 ,
42
- } ,
36
+ "small-1x" : {
37
+ name : "small-1x" as const ,
38
+ cpu : 0.5 ,
39
+ memory : 0.5 ,
40
+ centsPerMs : 0.0001 ,
43
41
} ,
44
- baseCostInCents : 0.0005 ,
45
42
} ,
46
- tracer : trace . getTracer ( "test" , "0.0.0" ) ,
43
+ baseCostInCents : 0.0005 ,
44
+ } ,
45
+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
46
+ } ) ;
47
+
48
+ try {
49
+ const taskIdentifier = "test-task" ;
50
+
51
+ //create background worker
52
+ const backgroundWorker = await setupBackgroundWorker (
53
+ prisma ,
54
+ authenticatedEnvironment ,
55
+ taskIdentifier
56
+ ) ;
57
+
58
+ const batch = await prisma . batchTaskRun . create ( {
59
+ data : {
60
+ friendlyId : generateFriendlyId ( "batch" ) ,
61
+ runtimeEnvironmentId : authenticatedEnvironment . id ,
62
+ } ,
47
63
} ) ;
48
64
49
- try {
50
- const taskIdentifier = "test-task" ;
65
+ //trigger the runs
66
+ const run1 = await engine . trigger (
67
+ {
68
+ number : 1 ,
69
+ friendlyId : "run_1234" ,
70
+ environment : authenticatedEnvironment ,
71
+ taskIdentifier,
72
+ payload : "{}" ,
73
+ payloadType : "application/json" ,
74
+ context : { } ,
75
+ traceContext : { } ,
76
+ traceId : "t12345" ,
77
+ spanId : "s12345" ,
78
+ masterQueue : "main" ,
79
+ queueName : "task/test-task" ,
80
+ isTest : false ,
81
+ tags : [ ] ,
82
+ batch : { id : batch . id , index : 0 } ,
83
+ } ,
84
+ prisma
85
+ ) ;
86
+
87
+ const run2 = await engine . trigger (
88
+ {
89
+ number : 2 ,
90
+ friendlyId : "run_1235" ,
91
+ environment : authenticatedEnvironment ,
92
+ taskIdentifier,
93
+ payload : "{}" ,
94
+ payloadType : "application/json" ,
95
+ context : { } ,
96
+ traceContext : { } ,
97
+ traceId : "t12345" ,
98
+ spanId : "s12345" ,
99
+ masterQueue : "main" ,
100
+ queueName : "task/test-task" ,
101
+ isTest : false ,
102
+ tags : [ ] ,
103
+ batch : { id : batch . id , index : 1 } ,
104
+ } ,
105
+ prisma
106
+ ) ;
107
+
108
+ expect ( run1 ) . toBeDefined ( ) ;
109
+ expect ( run1 . friendlyId ) . toBe ( "run_1234" ) ;
110
+ expect ( run1 . batchId ) . toBe ( batch . id ) ;
111
+
112
+ expect ( run2 ) . toBeDefined ( ) ;
113
+ expect ( run2 . friendlyId ) . toBe ( "run_1235" ) ;
114
+ expect ( run2 . batchId ) . toBe ( batch . id ) ;
115
+
116
+ //check the queue length
117
+ const queueLength = await engine . runQueue . lengthOfEnvQueue ( authenticatedEnvironment ) ;
118
+ expect ( queueLength ) . toBe ( 2 ) ;
119
+
120
+ //dequeue
121
+ const [ d1 , d2 ] = await engine . dequeueFromMasterQueue ( {
122
+ consumerId : "test_12345" ,
123
+ masterQueue : run1 . masterQueue ,
124
+ maxRunCount : 10 ,
125
+ } ) ;
51
126
52
- //create background worker
53
- const backgroundWorker = await setupBackgroundWorker (
54
- prisma ,
55
- authenticatedEnvironment ,
56
- taskIdentifier
57
- ) ;
127
+ //attempts
128
+ const attempt1 = await engine . startRunAttempt ( {
129
+ runId : d1 . run . id ,
130
+ snapshotId : d1 . snapshot . id ,
131
+ } ) ;
132
+ const attempt2 = await engine . startRunAttempt ( {
133
+ runId : d2 . run . id ,
134
+ snapshotId : d2 . snapshot . id ,
135
+ } ) ;
58
136
59
- const batch = await prisma . batchTaskRun . create ( {
60
- data : {
61
- friendlyId : generateFriendlyId ( "batch" ) ,
62
- runtimeEnvironmentId : authenticatedEnvironment . id ,
63
- } ,
64
- } ) ;
65
-
66
- //trigger the runs
67
- const run1 = await engine . trigger (
68
- {
69
- number : 1 ,
70
- friendlyId : "run_1234" ,
71
- environment : authenticatedEnvironment ,
72
- taskIdentifier,
73
- payload : "{}" ,
74
- payloadType : "application/json" ,
75
- context : { } ,
76
- traceContext : { } ,
77
- traceId : "t12345" ,
78
- spanId : "s12345" ,
79
- masterQueue : "main" ,
80
- queueName : "task/test-task" ,
81
- isTest : false ,
82
- tags : [ ] ,
83
- batch : { id : batch . id , index : 0 } ,
84
- } ,
85
- prisma
86
- ) ;
87
-
88
- const run2 = await engine . trigger (
89
- {
90
- number : 2 ,
91
- friendlyId : "run_1235" ,
92
- environment : authenticatedEnvironment ,
93
- taskIdentifier,
94
- payload : "{}" ,
95
- payloadType : "application/json" ,
96
- context : { } ,
97
- traceContext : { } ,
98
- traceId : "t12345" ,
99
- spanId : "s12345" ,
100
- masterQueue : "main" ,
101
- queueName : "task/test-task" ,
102
- isTest : false ,
103
- tags : [ ] ,
104
- batch : { id : batch . id , index : 1 } ,
105
- } ,
106
- prisma
107
- ) ;
108
-
109
- expect ( run1 ) . toBeDefined ( ) ;
110
- expect ( run1 . friendlyId ) . toBe ( "run_1234" ) ;
111
- expect ( run1 . batchId ) . toBe ( batch . id ) ;
112
-
113
- expect ( run2 ) . toBeDefined ( ) ;
114
- expect ( run2 . friendlyId ) . toBe ( "run_1235" ) ;
115
- expect ( run2 . batchId ) . toBe ( batch . id ) ;
116
-
117
- //check the queue length
118
- const queueLength = await engine . runQueue . lengthOfEnvQueue ( authenticatedEnvironment ) ;
119
- expect ( queueLength ) . toBe ( 2 ) ;
120
-
121
- //dequeue
122
- const [ d1 , d2 ] = await engine . dequeueFromMasterQueue ( {
123
- consumerId : "test_12345" ,
124
- masterQueue : run1 . masterQueue ,
125
- maxRunCount : 10 ,
126
- } ) ;
127
-
128
- //attempts
129
- const attempt1 = await engine . startRunAttempt ( {
130
- runId : d1 . run . id ,
131
- snapshotId : d1 . snapshot . id ,
132
- } ) ;
133
- const attempt2 = await engine . startRunAttempt ( {
134
- runId : d2 . run . id ,
135
- snapshotId : d2 . snapshot . id ,
136
- } ) ;
137
-
138
- //complete the runs
139
- const result1 = await engine . completeRunAttempt ( {
140
- runId : attempt1 . run . id ,
141
- snapshotId : attempt1 . snapshot . id ,
142
- completion : {
143
- ok : true ,
144
- id : attempt1 . run . id ,
145
- output : `{"foo":"bar"}` ,
146
- outputType : "application/json" ,
147
- } ,
148
- } ) ;
149
- const result2 = await engine . completeRunAttempt ( {
150
- runId : attempt2 . run . id ,
151
- snapshotId : attempt2 . snapshot . id ,
152
- completion : {
153
- ok : true ,
154
- id : attempt2 . run . id ,
155
- output : `{"baz":"qux"}` ,
156
- outputType : "application/json" ,
157
- } ,
158
- } ) ;
137
+ //complete the runs
138
+ const result1 = await engine . completeRunAttempt ( {
139
+ runId : attempt1 . run . id ,
140
+ snapshotId : attempt1 . snapshot . id ,
141
+ completion : {
142
+ ok : true ,
143
+ id : attempt1 . run . id ,
144
+ output : `{"foo":"bar"}` ,
145
+ outputType : "application/json" ,
146
+ } ,
147
+ } ) ;
148
+ const result2 = await engine . completeRunAttempt ( {
149
+ runId : attempt2 . run . id ,
150
+ snapshotId : attempt2 . snapshot . id ,
151
+ completion : {
152
+ ok : true ,
153
+ id : attempt2 . run . id ,
154
+ output : `{"baz":"qux"}` ,
155
+ outputType : "application/json" ,
156
+ } ,
157
+ } ) ;
159
158
160
- //the batch won't complete immediately
161
- const batchAfter1 = await prisma . batchTaskRun . findUnique ( {
162
- where : {
163
- id : batch . id ,
164
- } ,
165
- } ) ;
166
- expect ( batchAfter1 ?. status ) . toBe ( "PENDING" ) ;
159
+ //the batch won't complete immediately
160
+ const batchAfter1 = await prisma . batchTaskRun . findUnique ( {
161
+ where : {
162
+ id : batch . id ,
163
+ } ,
164
+ } ) ;
165
+ expect ( batchAfter1 ?. status ) . toBe ( "PENDING" ) ;
167
166
168
- await setTimeout ( 3_000 ) ;
167
+ await setTimeout ( 3_000 ) ;
169
168
170
- //the batch should complete
171
- const batchAfter2 = await prisma . batchTaskRun . findUnique ( {
172
- where : {
173
- id : batch . id ,
174
- } ,
175
- } ) ;
176
- expect ( batchAfter2 ?. status ) . toBe ( "COMPLETED" ) ;
177
- } finally {
178
- engine . quit ( ) ;
179
- }
169
+ //the batch should complete
170
+ const batchAfter2 = await prisma . batchTaskRun . findUnique ( {
171
+ where : {
172
+ id : batch . id ,
173
+ } ,
174
+ } ) ;
175
+ expect ( batchAfter2 ?. status ) . toBe ( "COMPLETED" ) ;
176
+ } finally {
177
+ engine . quit ( ) ;
180
178
}
181
- ) ;
179
+ } ) ;
182
180
} ) ;
0 commit comments