Skip to content

Commit 519cae4

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Remove OCCs from push conflicting with cron jobs (#36219)
I reproed the /push_config OCC with an every-second cron job. Confirmed that this fixes! This ensures that /push_config does not add the `next_run` ts into the readset in the case of a high-frequency cron job changing cronspec. It also means that if you make changes unrelated to a high-frequency cron, the `next_run` won't get added to the read set. Overall, this will reduce OCC to /push_config and improve availability on push! GitOrigin-RevId: f13d64d3d322614d6e91a54cb7b50eb714474f87
1 parent b69182e commit 519cae4

File tree

4 files changed

+78
-36
lines changed

4 files changed

+78
-36
lines changed

crates/application/src/cron_jobs/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use model::{
6161
CronJobResult,
6262
CronJobState,
6363
CronJobStatus,
64+
CronNextRun,
6465
},
6566
CronModel,
6667
},
@@ -519,7 +520,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
519520
let mut updated_job = job.clone();
520521
updated_job.state = CronJobState::InProgress;
521522
CronModel::new(&mut tx, component)
522-
.update_job_state(updated_job.clone())
523+
.update_job_state(updated_job.cron_next_run())
523524
.await?;
524525
self.database
525526
.commit_with_write_source(tx, "cron_in_progress")
@@ -771,11 +772,13 @@ impl<RT: Runtime> CronJobExecutor<RT> {
771772
.await?;
772773
}
773774

774-
let mut updated_job = job.clone();
775-
updated_job.state = CronJobState::Pending;
776-
updated_job.prev_ts = Some(prev_ts);
777-
updated_job.next_ts = next_ts;
778-
model.update_job_state(updated_job.clone()).await?;
775+
let next_run = CronNextRun {
776+
cron_job_id: job.id.developer_id,
777+
state: CronJobState::Pending,
778+
prev_ts: Some(prev_ts),
779+
next_ts,
780+
};
781+
model.update_job_state(next_run).await?;
779782
Ok(())
780783
}
781784
}

crates/application/src/tests/cron_jobs.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,12 @@ pub(crate) async fn test_cron_jobs_race_condition(rt: TestRuntime) -> anyhow::Re
157157
let job = jobs.get(&test_cron_identifier()).unwrap();
158158

159159
// Delete the cron job
160-
model.delete(job.clone()).await?;
160+
let job_metadata = model
161+
.list_metadata()
162+
.await?
163+
.remove(&test_cron_identifier())
164+
.unwrap();
165+
model.delete(job_metadata).await?;
161166
let jobs = model.list().await?;
162167
assert_eq!(jobs.len(), original_jobs.len());
163168

crates/model/src/cron_jobs/mod.rs

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ impl<'a, RT: Runtime> CronModel<'a, RT> {
207207
WithHeapSize::default()
208208
};
209209

210-
let old_crons = self.list().await?;
210+
let old_crons = self.list_metadata().await?;
211211
let mut added_crons: Vec<&CronIdentifier> = vec![];
212212
let mut updated_crons: Vec<&CronIdentifier> = vec![];
213213
let mut deleted_crons: Vec<&CronIdentifier> = vec![];
@@ -289,26 +289,50 @@ impl<'a, RT: Runtime> CronModel<'a, RT> {
289289
.transpose()
290290
}
291291

292-
pub async fn update(
292+
async fn update(
293293
&mut self,
294-
mut cron_job: CronJob,
294+
mut cron_job: ParsedDocument<CronJobMetadata>,
295295
new_cron_spec: CronSpec,
296296
) -> anyhow::Result<()> {
297297
if new_cron_spec.cron_schedule != cron_job.cron_spec.cron_schedule {
298+
// Skip updating the next run ts, if the runs are close together on the old
299+
// schedule. This is a heuristic to avoid OCC with existing cron
300+
// jobs running/changing state. True solution would be to move this
301+
// logic to the async worker, but quickfix for now is to skip the
302+
// `update_job_state`.
298303
let now = self.runtime().generate_timestamp()?;
299-
cron_job.next_ts = compute_next_ts(&new_cron_spec, cron_job.prev_ts, now)?;
304+
let next_ts = compute_next_ts(&cron_job.cron_spec, None, now)?;
305+
let next_next_run = compute_next_ts(&cron_job.cron_spec, Some(next_ts), next_ts)?;
306+
if next_next_run.secs_since_f64(now) > 30.0 {
307+
// Read in next-run to the readset and update it.
308+
let mut next_run = self
309+
.next_run(cron_job.id().developer_id)
310+
.await?
311+
.context("No next run found")?
312+
.into_value();
313+
314+
// Recalculate on the new schedule.
315+
let now = self.runtime().generate_timestamp()?;
316+
next_run.next_ts = compute_next_ts(&new_cron_spec, next_run.prev_ts, now)?;
317+
self.update_job_state(next_run).await?;
318+
}
300319
}
301320
cron_job.cron_spec = new_cron_spec;
302-
self.update_job_state(cron_job).await?;
321+
SystemMetadataModel::new(self.tx, self.component.into())
322+
.replace(cron_job.id(), cron_job.into_value().try_into()?)
323+
.await?;
303324
Ok(())
304325
}
305326

306-
pub async fn delete(&mut self, cron_job: CronJob) -> anyhow::Result<()> {
327+
pub async fn delete(
328+
&mut self,
329+
cron_job: ParsedDocument<CronJobMetadata>,
330+
) -> anyhow::Result<()> {
307331
SystemMetadataModel::new(self.tx, self.component.into())
308-
.delete(cron_job.id)
332+
.delete(cron_job.id())
309333
.await?;
310334
let next_run = self
311-
.next_run(cron_job.id.developer_id)
335+
.next_run(cron_job.id().developer_id)
312336
.await?
313337
.context("No next run found")?;
314338
SystemMetadataModel::new(self.tx, self.component.into())
@@ -319,28 +343,9 @@ impl<'a, RT: Runtime> CronModel<'a, RT> {
319343
Ok(())
320344
}
321345

322-
pub async fn update_job_state(&mut self, job: CronJob) -> anyhow::Result<()> {
323-
anyhow::ensure!(self
324-
.tx
325-
.table_mapping()
326-
.namespace(self.component.into())
327-
.tablet_matches_name(job.id.tablet_id, &CRON_JOBS_TABLE));
328-
let cron_job = CronJobMetadata {
329-
name: job.name,
330-
cron_spec: job.cron_spec,
331-
};
332-
SystemMetadataModel::new(self.tx, self.component.into())
333-
.replace(job.id, cron_job.try_into()?)
334-
.await?;
335-
336-
let next_run = CronNextRun {
337-
cron_job_id: job.id.developer_id,
338-
state: job.state,
339-
prev_ts: job.prev_ts,
340-
next_ts: job.next_ts,
341-
};
346+
pub async fn update_job_state(&mut self, next_run: CronNextRun) -> anyhow::Result<()> {
342347
let existing_next_run = self
343-
.next_run(job.id.developer_id)
348+
.next_run(next_run.cron_job_id)
344349
.await?
345350
.context("No next run found")?;
346351
SystemMetadataModel::new(self.tx, self.component.into())
@@ -405,6 +410,19 @@ impl<'a, RT: Runtime> CronModel<'a, RT> {
405410
Ok(cron_jobs)
406411
}
407412

413+
pub async fn list_metadata(
414+
&mut self,
415+
) -> anyhow::Result<BTreeMap<CronIdentifier, ParsedDocument<CronJobMetadata>>> {
416+
let cron_query = Query::full_table_scan(CRON_JOBS_TABLE.clone(), Order::Asc);
417+
let mut query_stream = ResolvedQuery::new(self.tx, self.component.into(), cron_query)?;
418+
let mut cron_jobs = BTreeMap::new();
419+
while let Some(job) = query_stream.next(self.tx, None).await? {
420+
let cron: ParsedDocument<CronJobMetadata> = job.parse()?;
421+
cron_jobs.insert(cron.name.clone(), cron);
422+
}
423+
Ok(cron_jobs)
424+
}
425+
408426
fn runtime(&self) -> &RT {
409427
self.tx.runtime()
410428
}

crates/model/src/cron_jobs/types.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,22 @@ impl CronJob {
8686
next_ts: next_run.next_ts,
8787
}
8888
}
89+
90+
pub fn cron_metadata(&self) -> CronJobMetadata {
91+
CronJobMetadata {
92+
name: self.name.clone(),
93+
cron_spec: self.cron_spec.clone(),
94+
}
95+
}
96+
97+
pub fn cron_next_run(&self) -> CronNextRun {
98+
CronNextRun {
99+
cron_job_id: self.id.developer_id,
100+
state: self.state,
101+
prev_ts: self.prev_ts,
102+
next_ts: self.next_ts,
103+
}
104+
}
89105
}
90106

91107
#[derive(Clone, Debug, PartialEq)]

0 commit comments

Comments
 (0)