@@ -36,6 +36,7 @@ use common::{
36
36
Runtime ,
37
37
RuntimeInstant ,
38
38
} ,
39
+ sync:: mpsc,
39
40
types:: {
40
41
FunctionCaller ,
41
42
UdfType ,
@@ -51,19 +52,14 @@ use errors::ErrorMetadataAnyhowExt;
51
52
use futures:: {
52
53
future:: Either ,
53
54
select_biased,
54
- stream:: FuturesUnordered ,
55
55
Future ,
56
56
FutureExt ,
57
- StreamExt ,
58
57
} ;
59
58
use isolate:: JsonPackedValue ;
60
59
use keybroker:: Identity ;
61
60
use minitrace:: future:: FutureExt as _;
62
61
use model:: {
63
- backend_state:: {
64
- types:: BackendState ,
65
- BackendStateModel ,
66
- } ,
62
+ backend_state:: BackendStateModel ,
67
63
cron_jobs:: {
68
64
next_ts:: compute_next_ts,
69
65
types:: {
@@ -78,6 +74,7 @@ use model::{
78
74
} ,
79
75
modules:: ModuleModel ,
80
76
} ;
77
+ use sync_types:: Timestamp ;
81
78
use usage_tracking:: FunctionUsageTracker ;
82
79
use value:: {
83
80
ResolvedDocumentId ,
@@ -101,6 +98,7 @@ const CRON_LOG_MAX_LOG_LINE_LENGTH: usize = 1000;
101
98
102
99
// This code is very similar to ScheduledJobExecutor and could potentially be
103
100
// refactored later.
101
+ #[ derive( Clone ) ]
104
102
pub struct CronJobExecutor < RT : Runtime > {
105
103
rt : RT ,
106
104
database : Database < RT > ,
@@ -152,73 +150,50 @@ impl<RT: Runtime> CronJobExecutor<RT> {
152
150
153
151
async fn run ( & self , backoff : & mut Backoff ) -> anyhow:: Result < ( ) > {
154
152
tracing:: info!( "Starting cron job executor" ) ;
155
- let mut futures = FuturesUnordered :: new ( ) ;
153
+ let ( job_finished_tx, mut job_finished_rx) =
154
+ mpsc:: channel ( * SCHEDULED_JOB_EXECUTION_PARALLELISM ) ;
156
155
let mut running_job_ids = HashSet :: new ( ) ;
156
+ let mut next_job_ready_time = None ;
157
157
loop {
158
158
let mut tx = self . database . begin ( Identity :: Unknown ) . await ?;
159
- // _backend_state appears unused but is needed to make sure the backend_state
160
- // is part of the readset for the query we subscribe to.
161
- let _backend_state = BackendStateModel :: new ( & mut tx) . get_backend_state ( ) . await ?;
162
-
163
- let now = self . rt . generate_timestamp ( ) ?;
164
- let index_query = Query :: index_range ( IndexRange {
165
- index_name : CRON_JOBS_INDEX_BY_NEXT_TS . clone ( ) ,
166
- range : vec ! [ ] ,
167
- order : Order :: Asc ,
168
- } ) ;
169
- let mut query_stream =
170
- ResolvedQuery :: new ( & mut tx, TableNamespace :: by_component_TODO ( ) , index_query) ?;
171
-
172
- let mut next_job_wait = None ;
173
- while let Some ( doc) = query_stream. next ( & mut tx, None ) . await ? {
174
- // Get the backend state again in case of a race where jobs are scheduled and
175
- // after the first tx begins the backend is paused.
176
- let mut new_tx = self . database . begin ( Identity :: Unknown ) . await ?;
177
- let backend_state = BackendStateModel :: new ( & mut new_tx)
178
- . get_backend_state ( )
179
- . await ?;
180
- drop ( new_tx) ;
181
- match backend_state {
182
- BackendState :: Running => { } ,
183
- BackendState :: Paused | BackendState :: Disabled => break ,
184
- }
185
- let job: ParsedDocument < CronJob > = doc. try_into ( ) ?;
186
- let ( job_id, job) = job. clone ( ) . into_id_and_value ( ) ;
187
- if running_job_ids. contains ( & job_id) {
188
- continue ;
189
- }
190
- if job. next_ts > now {
191
- next_job_wait = Some ( job. next_ts - now) ;
192
- break ;
193
- }
194
- metrics:: log_cron_job_execution_lag ( now - job. next_ts ) ;
195
- if running_job_ids. len ( ) == * SCHEDULED_JOB_EXECUTION_PARALLELISM {
196
- // We are due to execute the next job, but we can't because of
197
- // parallelism limits. We should break after logging the lag
198
- // here, and then wake up in few seconds to log the lag again
199
- // unless something else changes in between.
200
- next_job_wait = Some ( Duration :: from_secs ( 5 ) ) ;
201
- break ;
202
- }
203
- let root = self
204
- . rt
205
- . with_rng ( |rng| get_sampled_span ( "crons/execute_job" , rng, BTreeMap :: new ( ) ) ) ;
206
- futures. push ( self . execute_job ( job, job_id) . in_span ( root) ) ;
207
- running_job_ids. insert ( job_id) ;
208
- }
159
+ let backend_state = BackendStateModel :: new ( & mut tx) . get_backend_state ( ) . await ?;
160
+ let is_backend_stopped = backend_state. is_stopped ( ) ;
161
+
162
+ next_job_ready_time = if is_backend_stopped {
163
+ None
164
+ } else if running_job_ids. len ( ) == * SCHEDULED_JOB_EXECUTION_PARALLELISM {
165
+ next_job_ready_time
166
+ } else {
167
+ self . query_and_start_jobs ( & mut tx, & mut running_job_ids, & job_finished_tx)
168
+ . await ?
169
+ } ;
209
170
210
- let next_job_future = if let Some ( next_job_wait) = next_job_wait {
211
- Either :: Left ( self . rt . wait ( next_job_wait) )
171
+ let next_job_future = if let Some ( next_job_ts) = next_job_ready_time {
172
+ let now = self . rt . generate_timestamp ( ) ?;
173
+ Either :: Left ( if next_job_ts < now {
174
+ metrics:: log_cron_job_execution_lag ( now - next_job_ts) ;
175
+ // If we're behind, re-run this loop every 5 seconds to log the gauge above and
176
+ // track how far we're behind in our metrics.
177
+ self . rt . wait ( Duration :: from_secs ( 5 ) )
178
+ } else {
179
+ metrics:: log_cron_job_execution_lag ( Duration :: from_secs ( 0 ) ) ;
180
+ self . rt . wait ( next_job_ts - now)
181
+ } )
212
182
} else {
183
+ metrics:: log_cron_job_execution_lag ( Duration :: from_secs ( 0 ) ) ;
213
184
Either :: Right ( std:: future:: pending ( ) )
214
185
} ;
215
186
216
187
let token = tx. into_token ( ) ?;
217
188
let subscription = self . database . subscribe ( token) . await ?;
218
189
select_biased ! {
219
- job_id = futures. select_next_some( ) => {
220
- running_job_ids. remove( & job_id) ;
221
- }
190
+ job_id = job_finished_rx. recv( ) . fuse( ) => {
191
+ if let Some ( job_id) = job_id {
192
+ running_job_ids. remove( & job_id) ;
193
+ } else {
194
+ anyhow:: bail!( "Job results channel closed, this is unexpected!" ) ;
195
+ }
196
+ } ,
222
197
_ = next_job_future. fuse( ) => {
223
198
}
224
199
_ = subscription. wait_for_invalidation( ) . fuse( ) => {
@@ -228,6 +203,53 @@ impl<RT: Runtime> CronJobExecutor<RT> {
228
203
}
229
204
}
230
205
206
+ async fn query_and_start_jobs (
207
+ & self ,
208
+ tx : & mut Transaction < RT > ,
209
+ running_job_ids : & mut HashSet < ResolvedDocumentId > ,
210
+ job_finished_tx : & mpsc:: Sender < ResolvedDocumentId > ,
211
+ ) -> anyhow:: Result < Option < Timestamp > > {
212
+ let now = self . rt . generate_timestamp ( ) ?;
213
+ let index_query = Query :: index_range ( IndexRange {
214
+ index_name : CRON_JOBS_INDEX_BY_NEXT_TS . clone ( ) ,
215
+ range : vec ! [ ] ,
216
+ order : Order :: Asc ,
217
+ } ) ;
218
+ let mut query_stream =
219
+ ResolvedQuery :: new ( tx, TableNamespace :: by_component_TODO ( ) , index_query) ?;
220
+
221
+ while let Some ( doc) = query_stream. next ( tx, None ) . await ? {
222
+ let job: ParsedDocument < CronJob > = doc. try_into ( ) ?;
223
+ let ( job_id, job) = job. clone ( ) . into_id_and_value ( ) ;
224
+ if running_job_ids. contains ( & job_id) {
225
+ continue ;
226
+ }
227
+ let next_ts = job. next_ts ;
228
+ // If we can't execute the job return the job's target timestamp. If we're
229
+ // caught up, we can sleep until the timestamp. If we're behind and
230
+ // at our concurrency limit, we can use the timestamp to log how far
231
+ // behind we get.
232
+ if next_ts > now || running_job_ids. len ( ) == * SCHEDULED_JOB_EXECUTION_PARALLELISM {
233
+ return Ok ( Some ( next_ts) ) ;
234
+ }
235
+ let root = self
236
+ . rt
237
+ . with_rng ( |rng| get_sampled_span ( "crons/execute_job" , rng, BTreeMap :: new ( ) ) ) ;
238
+ let context = self . clone ( ) ;
239
+ let tx = job_finished_tx. clone ( ) ;
240
+ self . rt . spawn (
241
+ "spawn_cron_job" ,
242
+ async move {
243
+ let result = context. execute_job ( job, job_id) . await ;
244
+ let _ = tx. send ( result) . await ;
245
+ }
246
+ . in_span ( root) ,
247
+ ) ;
248
+ running_job_ids. insert ( job_id) ;
249
+ }
250
+ Ok ( None )
251
+ }
252
+
231
253
// This handles re-running the cron job on transient errors. It
232
254
// guarantees that the job was successfully run or the job state changed.
233
255
pub async fn execute_job (
0 commit comments