@@ -20,6 +20,10 @@ use common::{
20
20
knobs:: {
21
21
SCHEDULED_JOB_EXECUTION_PARALLELISM ,
22
22
SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE ,
23
+ SCHEDULED_JOB_GARBAGE_COLLECTION_INITIAL_BACKOFF ,
24
+ SCHEDULED_JOB_GARBAGE_COLLECTION_MAX_BACKOFF ,
25
+ SCHEDULED_JOB_INITIAL_BACKOFF ,
26
+ SCHEDULED_JOB_MAX_BACKOFF ,
23
27
SCHEDULED_JOB_RETENTION ,
24
28
UDF_EXECUTOR_OCC_MAX_RETRIES ,
25
29
} ,
@@ -147,9 +151,6 @@ impl<RT: Runtime> ScheduledJobRunner<RT> {
147
151
}
148
152
}
149
153
150
- const INITIAL_BACKOFF : Duration = Duration :: from_millis ( 10 ) ;
151
- const MAX_BACKOFF : Duration = Duration :: from_secs ( 5 ) ;
152
-
153
154
pub struct ScheduledJobExecutor < RT : Runtime > {
154
155
context : ScheduledJobContext < RT > ,
155
156
pause_client : PauseClient ,
@@ -195,7 +196,8 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
195
196
pause_client,
196
197
} ;
197
198
async move {
198
- let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
199
+ let mut backoff =
200
+ Backoff :: new ( * SCHEDULED_JOB_INITIAL_BACKOFF , * SCHEDULED_JOB_MAX_BACKOFF ) ;
199
201
while let Err ( mut e) = executor. run ( & mut backoff) . await {
200
202
let delay = executor. rt . with_rng ( |rng| backoff. fail ( rng) ) ;
201
203
tracing:: error!( "Scheduled job executor failed, sleeping {delay:?}" ) ;
@@ -417,7 +419,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
417
419
job : ScheduledJob ,
418
420
job_id : ResolvedDocumentId ,
419
421
) -> ResolvedDocumentId {
420
- let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
422
+ let mut backoff = Backoff :: new ( * SCHEDULED_JOB_INITIAL_BACKOFF , * SCHEDULED_JOB_MAX_BACKOFF ) ;
421
423
loop {
422
424
// Generate a new request_id for every schedule job execution attempt.
423
425
let request_id = RequestId :: new ( ) ;
@@ -731,7 +733,8 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
731
733
// Mark the job as completed. Keep trying until we succeed (or
732
734
// detect the job state has changed). Don't bubble up the error
733
735
// since otherwise we will lose the original execution logs.
734
- let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
736
+ let mut backoff =
737
+ Backoff :: new ( * SCHEDULED_JOB_INITIAL_BACKOFF , * SCHEDULED_JOB_MAX_BACKOFF ) ;
735
738
while let Err ( mut err) = self
736
739
. complete_action ( job_id, & updated_job, usage_tracker. clone ( ) , state. clone ( ) )
737
740
. await
@@ -844,7 +847,10 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
844
847
pub fn start ( rt : RT , database : Database < RT > ) -> impl Future < Output = ( ) > + Send {
845
848
let garbage_collector = Self { rt, database } ;
846
849
async move {
847
- let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
850
+ let mut backoff = Backoff :: new (
851
+ * SCHEDULED_JOB_GARBAGE_COLLECTION_INITIAL_BACKOFF ,
852
+ * SCHEDULED_JOB_GARBAGE_COLLECTION_MAX_BACKOFF ,
853
+ ) ;
848
854
while let Err ( mut e) = garbage_collector. run ( & mut backoff) . await {
849
855
let delay = garbage_collector. rt . with_rng ( |rng| backoff. fail ( rng) ) ;
850
856
tracing:: error!( "Scheduled job garbage collector failed, sleeping {delay:?}" ) ;
0 commit comments