@@ -10,10 +10,7 @@ use std::{
10
10
11
11
use common:: {
12
12
backoff:: Backoff ,
13
- components:: {
14
- CanonicalizedComponentFunctionPath ,
15
- ComponentPath ,
16
- } ,
13
+ components:: CanonicalizedComponentFunctionPath ,
17
14
document:: ParsedDocument ,
18
15
errors:: {
19
16
report_error,
@@ -61,14 +58,17 @@ use futures::{
61
58
select_biased,
62
59
Future ,
63
60
FutureExt ,
61
+ TryStreamExt ,
64
62
} ;
63
+ use futures_async_stream:: try_stream;
65
64
use keybroker:: Identity ;
66
65
use minitrace:: future:: FutureExt as _;
67
66
use model:: {
68
67
backend_state:: {
69
68
types:: BackendState ,
70
69
BackendStateModel ,
71
70
} ,
71
+ components:: ComponentsModel ,
72
72
modules:: ModuleModel ,
73
73
scheduled_jobs:: {
74
74
types:: {
@@ -80,6 +80,7 @@ use model::{
80
80
NEXT_TS_FIELD ,
81
81
SCHEDULED_JOBS_INDEX ,
82
82
SCHEDULED_JOBS_INDEX_BY_COMPLETED_TS ,
83
+ SCHEDULED_JOBS_TABLE ,
83
84
} ,
84
85
} ;
85
86
use parking_lot:: Mutex ;
@@ -321,18 +322,8 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
321
322
job_finished_tx : & mpsc:: Sender < ResolvedDocumentId > ,
322
323
) -> anyhow:: Result < Option < Timestamp > > {
323
324
let now = self . rt . generate_timestamp ( ) ?;
324
- let index_query = Query :: index_range ( IndexRange {
325
- index_name : SCHEDULED_JOBS_INDEX . clone ( ) ,
326
- range : vec ! [ IndexRangeExpression :: Gt (
327
- NEXT_TS_FIELD . clone( ) ,
328
- value:: ConvexValue :: Null ,
329
- ) ] ,
330
- order : Order :: Asc ,
331
- } ) ;
332
- let mut query_stream =
333
- ResolvedQuery :: new ( tx, TableNamespace :: by_component_TODO ( ) , index_query) ?;
334
- while let Some ( doc) = query_stream. next ( tx, None ) . await ? {
335
- let job: ParsedDocument < ScheduledJob > = doc. try_into ( ) ?;
325
+ let mut job_stream = self . stream_jobs_to_run ( tx) ;
326
+ while let Some ( job) = job_stream. try_next ( ) . await ? {
336
327
let ( job_id, job) = job. clone ( ) . into_id_and_value ( ) ;
337
328
if running_job_ids. contains ( & job_id) {
338
329
continue ;
@@ -373,6 +364,49 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
373
364
}
374
365
Ok ( None )
375
366
}
367
+
368
+ #[ try_stream( boxed, ok = ParsedDocument <ScheduledJob >, error = anyhow:: Error ) ]
369
+ async fn stream_jobs_to_run < ' a > ( & ' a self , tx : & ' a mut Transaction < RT > ) {
370
+ let namespaces: Vec < _ > = tx
371
+ . table_mapping ( )
372
+ . iter ( )
373
+ . filter ( |( _, _, _, name) | * * name == * SCHEDULED_JOBS_TABLE )
374
+ . map ( |( _, namespace, ..) | namespace)
375
+ . collect ( ) ;
376
+ let index_query = Query :: index_range ( IndexRange {
377
+ index_name : SCHEDULED_JOBS_INDEX . clone ( ) ,
378
+ range : vec ! [ IndexRangeExpression :: Gt (
379
+ NEXT_TS_FIELD . clone( ) ,
380
+ value:: ConvexValue :: Null ,
381
+ ) ] ,
382
+ order : Order :: Asc ,
383
+ } ) ;
384
+ // Key is (next_ts, namespace), where next_ts is for sorting and namespace
385
+ // is for deduping.
386
+ // Value is (job, query) where job is the job to run and query will get
387
+ // the next job to run in that namespace.
388
+ let mut queries = BTreeMap :: new ( ) ;
389
+ for namespace in namespaces {
390
+ let mut query = ResolvedQuery :: new ( tx, namespace, index_query. clone ( ) ) ?;
391
+ if let Some ( doc) = query. next ( tx, None ) . await ? {
392
+ let job: ParsedDocument < ScheduledJob > = doc. try_into ( ) ?;
393
+ let next_ts = job. next_ts . ok_or_else ( || {
394
+ anyhow:: anyhow!( "Could not get next_ts to run scheduled job {}" , job. id( ) )
395
+ } ) ?;
396
+ queries. insert ( ( next_ts, namespace) , ( job, query) ) ;
397
+ }
398
+ }
399
+ while let Some ( ( ( _min_next_ts, namespace) , ( min_job, mut query) ) ) = queries. pop_first ( ) {
400
+ yield min_job;
401
+ if let Some ( doc) = query. next ( tx, None ) . await ? {
402
+ let job: ParsedDocument < ScheduledJob > = doc. try_into ( ) ?;
403
+ let next_ts = job. next_ts . ok_or_else ( || {
404
+ anyhow:: anyhow!( "Could not get next_ts to run scheduled job {}" , job. id( ) )
405
+ } ) ?;
406
+ queries. insert ( ( next_ts, namespace) , ( job, query) ) ;
407
+ }
408
+ }
409
+ }
376
410
}
377
411
378
412
impl < RT : Runtime > ScheduledJobContext < RT > {
@@ -424,14 +458,18 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
424
458
425
459
tracing:: info!( "Executing {:?}!" , job. udf_path) ;
426
460
let identity = tx. inert_identity ( ) ;
461
+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
462
+ let component_path = ComponentsModel :: new ( & mut tx)
463
+ . get_component_path_for_namespace ( namespace)
464
+ . await ?;
427
465
428
466
// Since we don't specify the function type when we schedule, we have to
429
467
// use the analyzed result.
430
468
let caller = FunctionCaller :: Scheduler {
431
469
job_id : job_id. into ( ) ,
432
470
} ;
433
471
let path = CanonicalizedComponentFunctionPath {
434
- component : ComponentPath :: root ( ) ,
472
+ component : component_path ,
435
473
udf_path : job. udf_path . clone ( ) ,
436
474
} ;
437
475
let udf_type = match ModuleModel :: new ( & mut tx)
@@ -440,7 +478,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
440
478
{
441
479
Ok ( analyzed_function) => analyzed_function. udf_type ,
442
480
Err ( error) => {
443
- SchedulerModel :: new ( & mut tx)
481
+ SchedulerModel :: new ( & mut tx, namespace )
444
482
. complete (
445
483
job_id,
446
484
ScheduledJobState :: Failed ( error. user_facing_message ( ) ) ,
@@ -487,7 +525,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
487
525
UdfType :: Mutation ,
488
526
UdfType :: Action ,
489
527
) ;
490
- SchedulerModel :: new ( & mut tx)
528
+ SchedulerModel :: new ( & mut tx, namespace )
491
529
. complete ( job_id, ScheduledJobState :: Failed ( message. clone ( ) ) )
492
530
. await ?;
493
531
self . database
@@ -545,16 +583,20 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
545
583
& self ,
546
584
request_id : RequestId ,
547
585
caller : FunctionCaller ,
548
- tx : Transaction < RT > ,
586
+ mut tx : Transaction < RT > ,
549
587
job : ScheduledJob ,
550
588
job_id : ResolvedDocumentId ,
551
589
usage_tracker : FunctionUsageTracker ,
552
590
) -> anyhow:: Result < ( ) > {
553
591
let start = self . rt . monotonic_now ( ) ;
554
592
let context = ExecutionContext :: new ( request_id, & caller) ;
555
593
let identity = tx. inert_identity ( ) ;
594
+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
595
+ let component_path = ComponentsModel :: new ( & mut tx)
596
+ . get_component_path_for_namespace ( namespace)
597
+ . await ?;
556
598
let path = CanonicalizedComponentFunctionPath {
557
- component : ComponentPath :: root ( ) ,
599
+ component : component_path ,
558
600
udf_path : job. udf_path . clone ( ) ,
559
601
} ;
560
602
let result = self
@@ -587,7 +629,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
587
629
let execution_time = start. elapsed ( ) ;
588
630
589
631
if outcome. result . is_ok ( ) {
590
- SchedulerModel :: new ( & mut tx)
632
+ SchedulerModel :: new ( & mut tx, namespace )
591
633
. complete ( job_id, ScheduledJobState :: Success )
592
634
. await ?;
593
635
if let Err ( err) = self
@@ -614,7 +656,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
614
656
// Continue without updating since the job state has changed
615
657
return Ok ( ( ) ) ;
616
658
}
617
- SchedulerModel :: new ( & mut tx)
659
+ SchedulerModel :: new ( & mut tx, namespace )
618
660
. complete (
619
661
job_id,
620
662
ScheduledJobState :: Failed ( outcome. result . clone ( ) . unwrap_err ( ) . to_string ( ) ) ,
@@ -648,12 +690,16 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
648
690
) -> anyhow:: Result < ( ) > {
649
691
let identity = tx. identity ( ) . clone ( ) ;
650
692
let mut tx = self . database . begin ( identity. clone ( ) ) . await ?;
693
+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
694
+ let component_path = ComponentsModel :: new ( & mut tx)
695
+ . get_component_path_for_namespace ( namespace)
696
+ . await ?;
651
697
match job. state {
652
698
ScheduledJobState :: Pending => {
653
699
// Set state to in progress
654
700
let mut updated_job = job. clone ( ) ;
655
701
updated_job. state = ScheduledJobState :: InProgress ;
656
- SchedulerModel :: new ( & mut tx)
702
+ SchedulerModel :: new ( & mut tx, namespace )
657
703
. replace ( job_id, updated_job. clone ( ) )
658
704
. await ?;
659
705
self . database
@@ -663,7 +709,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
663
709
// Execute the action
664
710
let context = ExecutionContext :: new ( request_id, & caller) ;
665
711
let path = CanonicalizedComponentFunctionPath {
666
- component : ComponentPath :: root ( ) ,
712
+ component : component_path ,
667
713
udf_path : job. udf_path . clone ( ) ,
668
714
} ;
669
715
let completion = self
@@ -703,7 +749,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
703
749
// before updating the state. Since we execute actions at most once,
704
750
// complete this job and log the error.
705
751
let message = "Transient error while executing action" . to_string ( ) ;
706
- SchedulerModel :: new ( & mut tx)
752
+ SchedulerModel :: new ( & mut tx, namespace )
707
753
. complete ( job_id, ScheduledJobState :: Failed ( message. clone ( ) ) )
708
754
. await ?;
709
755
self . database
@@ -715,7 +761,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
715
761
// we can log correctly here.
716
762
let context = ExecutionContext :: new ( request_id, & caller) ;
717
763
let path = CanonicalizedComponentFunctionPath {
718
- component : ComponentPath :: root ( ) ,
764
+ component : component_path ,
719
765
udf_path : job. udf_path . clone ( ) ,
720
766
} ;
721
767
self . function_log . log_action_system_error (
@@ -776,9 +822,10 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
776
822
// Continue without updating since the job state has changed
777
823
return Ok ( ( ) ) ;
778
824
}
825
+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
779
826
780
827
// Remove from the scheduled jobs table
781
- SchedulerModel :: new ( & mut tx)
828
+ SchedulerModel :: new ( & mut tx, namespace )
782
829
. complete ( job_id, job_state)
783
830
. await ?;
784
831
self . database
@@ -814,6 +861,7 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
814
861
async fn run ( & self , backoff : & mut Backoff ) -> anyhow:: Result < ( ) > {
815
862
loop {
816
863
let mut tx = self . database . begin ( Identity :: system ( ) ) . await ?;
864
+ let namespace = TableNamespace :: by_component_TODO ( ) ;
817
865
let now = self . rt . generate_timestamp ( ) ?;
818
866
let index_query = Query :: index_range ( IndexRange {
819
867
index_name : SCHEDULED_JOBS_INDEX_BY_COMPLETED_TS . clone ( ) ,
@@ -824,8 +872,7 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
824
872
order : Order :: Asc ,
825
873
} )
826
874
. limit ( * SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE ) ;
827
- let mut query_stream =
828
- ResolvedQuery :: new ( & mut tx, TableNamespace :: by_component_TODO ( ) , index_query) ?;
875
+ let mut query_stream = ResolvedQuery :: new ( & mut tx, namespace, index_query) ?;
829
876
830
877
let mut next_job_wait = None ;
831
878
let mut jobs_to_delete = vec ! [ ] ;
@@ -855,7 +902,7 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
855
902
"Garbage collecting {} finished scheduled jobs" ,
856
903
jobs_to_delete. len( )
857
904
) ;
858
- let mut model = SchedulerModel :: new ( & mut tx) ;
905
+ let mut model = SchedulerModel :: new ( & mut tx, namespace ) ;
859
906
for job_id in jobs_to_delete {
860
907
model. delete ( job_id) . await ?;
861
908
}
0 commit comments