@@ -44,6 +44,7 @@ use common::{
44
44
RequestId ,
45
45
} ;
46
46
use database:: {
47
+ BootstrapComponentsModel ,
47
48
Database ,
48
49
ResolvedQuery ,
49
50
Transaction ,
@@ -54,7 +55,9 @@ use futures::{
54
55
select_biased,
55
56
Future ,
56
57
FutureExt ,
58
+ TryStreamExt ,
57
59
} ;
60
+ use futures_async_stream:: try_stream;
58
61
use isolate:: JsonPackedValue ;
59
62
use keybroker:: Identity ;
60
63
use minitrace:: future:: FutureExt as _;
@@ -71,6 +74,7 @@ use model::{
71
74
} ,
72
75
CronModel ,
73
76
CRON_JOBS_INDEX_BY_NEXT_TS ,
77
+ CRON_JOBS_TABLE ,
74
78
} ,
75
79
modules:: ModuleModel ,
76
80
} ;
@@ -210,16 +214,8 @@ impl<RT: Runtime> CronJobExecutor<RT> {
210
214
job_finished_tx : & mpsc:: Sender < ResolvedDocumentId > ,
211
215
) -> anyhow:: Result < Option < Timestamp > > {
212
216
let now = self . rt . generate_timestamp ( ) ?;
213
- let index_query = Query :: index_range ( IndexRange {
214
- index_name : CRON_JOBS_INDEX_BY_NEXT_TS . clone ( ) ,
215
- range : vec ! [ ] ,
216
- order : Order :: Asc ,
217
- } ) ;
218
- let mut query_stream =
219
- ResolvedQuery :: new ( tx, TableNamespace :: by_component_TODO ( ) , index_query) ?;
220
-
221
- while let Some ( doc) = query_stream. next ( tx, None ) . await ? {
222
- let job: ParsedDocument < CronJob > = doc. try_into ( ) ?;
217
+ let mut job_stream = self . stream_jobs_to_run ( tx) ;
218
+ while let Some ( job) = job_stream. try_next ( ) . await ? {
223
219
let ( job_id, job) = job. clone ( ) . into_id_and_value ( ) ;
224
220
if running_job_ids. contains ( & job_id) {
225
221
continue ;
@@ -250,6 +246,42 @@ impl<RT: Runtime> CronJobExecutor<RT> {
250
246
Ok ( None )
251
247
}
252
248
249
+ #[ try_stream( boxed, ok = ParsedDocument <CronJob >, error = anyhow:: Error ) ]
250
+ async fn stream_jobs_to_run < ' a > ( & ' a self , tx : & ' a mut Transaction < RT > ) {
251
+ let namespaces: Vec < _ > = tx
252
+ . table_mapping ( )
253
+ . iter ( )
254
+ . filter ( |( _, _, _, name) | * * name == * CRON_JOBS_TABLE )
255
+ . map ( |( _, namespace, ..) | namespace)
256
+ . collect ( ) ;
257
+ let index_query = Query :: index_range ( IndexRange {
258
+ index_name : CRON_JOBS_INDEX_BY_NEXT_TS . clone ( ) ,
259
+ range : vec ! [ ] ,
260
+ order : Order :: Asc ,
261
+ } ) ;
262
+ // Key is (next_ts, namespace), where next_ts is for sorting and namespace
263
+ // is for deduping.
264
+ // Value is (job, query) where job is the job to run and query will get
265
+ // the next job to run in that namespace.
266
+ let mut queries = BTreeMap :: new ( ) ;
267
+ for namespace in namespaces {
268
+ let mut query = ResolvedQuery :: new ( tx, namespace, index_query. clone ( ) ) ?;
269
+ if let Some ( doc) = query. next ( tx, None ) . await ? {
270
+ let job: ParsedDocument < CronJob > = doc. try_into ( ) ?;
271
+ let next_ts = job. next_ts ;
272
+ queries. insert ( ( next_ts, namespace) , ( job, query) ) ;
273
+ }
274
+ }
275
+ while let Some ( ( ( _min_next_ts, namespace) , ( min_job, mut query) ) ) = queries. pop_first ( ) {
276
+ yield min_job;
277
+ if let Some ( doc) = query. next ( tx, None ) . await ? {
278
+ let job: ParsedDocument < CronJob > = doc. try_into ( ) ?;
279
+ let next_ts = job. next_ts ;
280
+ queries. insert ( ( next_ts, namespace) , ( job, query) ) ;
281
+ }
282
+ }
283
+ }
284
+
253
285
// This handles re-running the cron job on transient errors. It
254
286
// guarantees that the job was successfully run or the job state changed.
255
287
pub async fn execute_job (
@@ -292,12 +324,13 @@ impl<RT: Runtime> CronJobExecutor<RT> {
292
324
// Continue without running function since the job state has changed
293
325
return Ok ( job_id) ;
294
326
} ;
327
+ let ( _, component_path) = self . get_job_component ( & mut tx, job_id) . await ?;
295
328
tracing:: info!( "Executing {:?}!" , job. cron_spec. udf_path) ;
296
329
297
330
// Since we don't specify the function type in the cron, we have to use
298
331
// the analyzed result.
299
332
let path = CanonicalizedComponentFunctionPath {
300
- component : ComponentPath :: root ( ) ,
333
+ component : component_path ,
301
334
udf_path : job. cron_spec . udf_path . clone ( ) ,
302
335
} ;
303
336
let udf_type = ModuleModel :: new ( & mut tx)
@@ -366,20 +399,37 @@ impl<RT: Runtime> CronJobExecutor<RT> {
366
399
}
367
400
}
368
401
402
+ async fn get_job_component (
403
+ & self ,
404
+ tx : & mut Transaction < RT > ,
405
+ job_id : ResolvedDocumentId ,
406
+ ) -> anyhow:: Result < ( ComponentId , ComponentPath ) > {
407
+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
408
+ let component = match namespace {
409
+ TableNamespace :: Global => ComponentId :: Root ,
410
+ TableNamespace :: ByComponent ( id) => ComponentId :: Child ( id) ,
411
+ } ;
412
+ let component_path = BootstrapComponentsModel :: new ( tx)
413
+ . get_component_path ( component)
414
+ . await ?;
415
+ Ok ( ( component, component_path) )
416
+ }
417
+
369
418
async fn handle_mutation (
370
419
& self ,
371
420
request_id : RequestId ,
372
- tx : Transaction < RT > ,
421
+ mut tx : Transaction < RT > ,
373
422
job : CronJob ,
374
423
job_id : ResolvedDocumentId ,
375
424
usage_tracker : FunctionUsageTracker ,
376
425
) -> anyhow:: Result < ( ) > {
377
426
let start = self . rt . monotonic_now ( ) ;
378
427
let identity = tx. inert_identity ( ) ;
379
428
let caller = FunctionCaller :: Cron ;
429
+ let ( component, component_path) = self . get_job_component ( & mut tx, job_id) . await ?;
380
430
let context = ExecutionContext :: new ( request_id, & caller) ;
381
431
let path = CanonicalizedComponentFunctionPath {
382
- component : ComponentPath :: root ( ) ,
432
+ component : component_path ,
383
433
udf_path : job. cron_spec . udf_path . clone ( ) ,
384
434
} ;
385
435
let mutation_result = self
@@ -412,7 +462,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
412
462
let execution_time_f64 = execution_time. as_secs_f64 ( ) ;
413
463
let truncated_log_lines = self . truncate_log_lines ( outcome. log_lines . clone ( ) ) ;
414
464
415
- let mut model = CronModel :: new ( & mut tx, ComponentId :: TODO ( ) ) ;
465
+ let mut model = CronModel :: new ( & mut tx, component ) ;
416
466
417
467
if let Ok ( ref result) = outcome. result {
418
468
let truncated_result = self . truncate_result ( result. clone ( ) ) ;
@@ -427,7 +477,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
427
477
. await ?;
428
478
self . complete_job_run (
429
479
identity. clone ( ) ,
430
- & mut model ,
480
+ & mut tx ,
431
481
job_id,
432
482
& job,
433
483
UdfType :: Mutation ,
@@ -457,14 +507,14 @@ impl<RT: Runtime> CronJobExecutor<RT> {
457
507
// Continue without updating since the job state has changed
458
508
return Ok ( ( ) ) ;
459
509
} ;
460
- let mut model = CronModel :: new ( & mut tx, ComponentId :: TODO ( ) ) ;
510
+ let mut model = CronModel :: new ( & mut tx, component ) ;
461
511
let status = CronJobStatus :: Err ( e. to_string ( ) ) ;
462
512
model
463
513
. insert_cron_job_log ( & job, status, truncated_log_lines, execution_time_f64)
464
514
. await ?;
465
515
self . complete_job_run (
466
516
identity,
467
- & mut model ,
517
+ & mut tx ,
468
518
job_id,
469
519
& job,
470
520
UdfType :: Mutation ,
@@ -497,14 +547,20 @@ impl<RT: Runtime> CronJobExecutor<RT> {
497
547
job_id : ResolvedDocumentId ,
498
548
usage_tracker : FunctionUsageTracker ,
499
549
) -> anyhow:: Result < ( ) > {
550
+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
551
+ let component = match namespace {
552
+ TableNamespace :: Global => ComponentId :: Root ,
553
+ TableNamespace :: ByComponent ( id) => ComponentId :: Child ( id) ,
554
+ } ;
500
555
let identity = tx. identity ( ) . clone ( ) ;
556
+ let ( _, component_path) = self . get_job_component ( & mut tx, job_id) . await ?;
501
557
let caller = FunctionCaller :: Cron ;
502
558
match job. state {
503
559
CronJobState :: Pending => {
504
560
// Set state to in progress
505
561
let mut updated_job = job. clone ( ) ;
506
562
updated_job. state = CronJobState :: InProgress ;
507
- CronModel :: new ( & mut tx, ComponentId :: TODO ( ) )
563
+ CronModel :: new ( & mut tx, component )
508
564
. update_job_state ( job_id, updated_job. clone ( ) )
509
565
. await ?;
510
566
self . database
@@ -514,7 +570,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
514
570
// Execute the action
515
571
let context = ExecutionContext :: new ( request_id, & caller) ;
516
572
let path = CanonicalizedComponentFunctionPath {
517
- component : ComponentPath :: root ( ) ,
573
+ component : component_path ,
518
574
udf_path : job. cron_spec . udf_path . clone ( ) ,
519
575
} ;
520
576
let completion = self
@@ -582,14 +638,14 @@ impl<RT: Runtime> CronJobExecutor<RT> {
582
638
// guess the correct behavior here is to store the executionId in the state so
583
639
// we can log correctly here.
584
640
let context = ExecutionContext :: new ( request_id, & caller) ;
585
- let mut model = CronModel :: new ( & mut tx, ComponentId :: TODO ( ) ) ;
641
+ let mut model = CronModel :: new ( & mut tx, component ) ;
586
642
model
587
643
. insert_cron_job_log ( & job, status, log_lines, 0.0 )
588
644
. await ?;
589
645
let identity: InertIdentity = identity. into ( ) ;
590
646
self . complete_job_run (
591
647
identity. clone ( ) ,
592
- & mut model ,
648
+ & mut tx ,
593
649
job_id,
594
650
& job,
595
651
UdfType :: Action ,
@@ -601,7 +657,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
601
657
. await ?;
602
658
603
659
let path = CanonicalizedComponentFunctionPath {
604
- component : ComponentPath :: root ( ) ,
660
+ component : component_path ,
605
661
udf_path : job. cron_spec . udf_path ,
606
662
} ;
607
663
self . function_log . log_action_system_error (
@@ -660,13 +716,18 @@ impl<RT: Runtime> CronJobExecutor<RT> {
660
716
// Continue without updating since the job state has changed
661
717
return Ok ( ( ) ) ;
662
718
} ;
663
- let mut model = CronModel :: new ( & mut tx, ComponentId :: TODO ( ) ) ;
719
+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
720
+ let component = match namespace {
721
+ TableNamespace :: Global => ComponentId :: Root ,
722
+ TableNamespace :: ByComponent ( id) => ComponentId :: Child ( id) ,
723
+ } ;
724
+ let mut model = CronModel :: new ( & mut tx, component) ;
664
725
model
665
726
. insert_cron_job_log ( expected_state, status, log_lines, execution_time)
666
727
. await ?;
667
728
self . complete_job_run (
668
729
identity,
669
- & mut model ,
730
+ & mut tx ,
670
731
job_id,
671
732
expected_state,
672
733
UdfType :: Action ,
@@ -682,7 +743,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
682
743
async fn complete_job_run (
683
744
& self ,
684
745
identity : InertIdentity ,
685
- model : & mut CronModel < ' _ , RT > ,
746
+ tx : & mut Transaction < RT > ,
686
747
job_id : ResolvedDocumentId ,
687
748
job : & CronJob ,
688
749
udf_type : UdfType ,
@@ -693,6 +754,8 @@ impl<RT: Runtime> CronJobExecutor<RT> {
693
754
let mut next_ts = compute_next_ts ( & job. cron_spec , Some ( prev_ts) , now) ?;
694
755
let mut num_skipped = 0 ;
695
756
let first_skipped_ts = next_ts;
757
+ let ( component, component_path) = self . get_job_component ( tx, job_id) . await ?;
758
+ let mut model = CronModel :: new ( tx, component) ;
696
759
while next_ts < now {
697
760
num_skipped += 1 ;
698
761
next_ts = compute_next_ts ( & job. cron_spec , Some ( next_ts) , now) ?;
@@ -715,7 +778,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
715
778
runs are in the past"
716
779
) ,
717
780
CanonicalizedComponentFunctionPath {
718
- component : ComponentPath :: root ( ) ,
781
+ component : component_path ,
719
782
udf_path : job. cron_spec . udf_path . clone ( ) ,
720
783
} ,
721
784
job. cron_spec . udf_args . clone ( ) ,
@@ -732,7 +795,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
732
795
runs are in the past"
733
796
) ,
734
797
CanonicalizedComponentFunctionPath {
735
- component : ComponentPath :: root ( ) ,
798
+ component : component_path ,
736
799
udf_path : job. cron_spec . udf_path . clone ( ) ,
737
800
} ,
738
801
job. cron_spec . udf_args . clone ( ) ,
0 commit comments