Skip to content

Commit 0b03ae6

Browse files
committed
Ensure the update_downloads job doesn't run concurrently
If multiple instances of this job are run concurrently then it is possible to overcount downloads, at least temporarily. The job first selects all matching `version_downloads` and later uses those values to calculate how many downloads to add to `versions` and `crates`. If a second job is run, it would select some rows from `version_downloads` that were already queued for processing by the first task. If an overcount were to occur, the next time the job is run it should calculate a negative adjustment and correct the situation. There's no point in doing extra work and if we eventually need concurrency we should built that out intentionally. Therefore, this commit wraps the entire job in a transaction and obtains an transaction level advisory lock from the database. If the lock has already been taken the job will fail and will be retried by swirl. If the duration of this job begins to approach the scheduling interval, then we will want to increase that interval to avoid triggering alerts.
1 parent 5dcb80d commit 0b03ae6

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

src/tasks.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,8 @@ mod update_downloads;
33

44
pub use dump_db::dump_db;
55
pub use update_downloads::update_downloads;
6+
7+
use diesel::sql_types::BigInt;
8+
sql_function!(fn pg_try_advisory_xact_lock(key: BigInt) -> Bool);
9+
10+
const UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY: i64 = 1;

src/tasks/update_downloads.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use super::pg_try_advisory_xact_lock;
2+
use super::UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY as LOCK_KEY;
13
use crate::{
24
background_jobs::Environment,
35
models::VersionDownload,
@@ -9,9 +11,17 @@ use swirl::PerformError;
911

1012
#[swirl::background_job]
1113
pub fn update_downloads(env: &Environment) -> Result<(), PerformError> {
14+
use diesel::select;
15+
1216
let conn = env.connection()?;
13-
update(&conn)?;
14-
Ok(())
17+
conn.transaction::<_, PerformError, _>(|| {
18+
// If this job runs concurrently with itself, it could result in a overcount
19+
if !select(pg_try_advisory_xact_lock(LOCK_KEY)).get_result(&*conn)? {
20+
return Err("The advisory lock for update_downloads is already taken".into());
21+
}
22+
23+
update(&conn).map_err(Into::into)
24+
})
1525
}
1626

1727
fn update(conn: &PgConnection) -> QueryResult<()> {

0 commit comments

Comments
 (0)