Skip to content

Commit b0bfb44

Browse files
committed
Auto merge of #4108 - Turbo87:worker-module, r=JohnTitor
Extract `worker` module This PR renames the `tasks` module to `worker`, to make it clearer that these things are running on the background worker. It also moves the other swirl tasks outside of the `tasks` module into the new `worker` module.
2 parents a601c0f + 62eb732 commit b0bfb44

19 files changed

+196
-185
lines changed

src/bin/enqueue-job.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#![warn(clippy::all, rust_2018_idioms)]
22

33
use anyhow::{anyhow, Result};
4-
use cargo_registry::{db, env, git, tasks};
4+
use cargo_registry::{db, env, worker};
55
use diesel::prelude::*;
66
use swirl::schema::background_jobs::dsl::*;
77
use swirl::Job;
@@ -25,18 +25,18 @@ fn main() -> Result<()> {
2525
println!("Did not enqueue update_downloads, existing job already in progress");
2626
Ok(())
2727
} else {
28-
Ok(tasks::update_downloads().enqueue(&conn)?)
28+
Ok(worker::update_downloads().enqueue(&conn)?)
2929
}
3030
}
3131
"dump_db" => {
3232
let database_url = args.next().unwrap_or_else(|| env("READ_ONLY_REPLICA_URL"));
3333
let target_name = args
3434
.next()
3535
.unwrap_or_else(|| String::from("db-dump.tar.gz"));
36-
Ok(tasks::dump_db(database_url, target_name).enqueue(&conn)?)
36+
Ok(worker::dump_db(database_url, target_name).enqueue(&conn)?)
3737
}
38-
"daily_db_maintenance" => Ok(tasks::daily_db_maintenance().enqueue(&conn)?),
39-
"squash_index" => Ok(git::squash_index().enqueue(&conn)?),
38+
"daily_db_maintenance" => Ok(worker::daily_db_maintenance().enqueue(&conn)?),
39+
"squash_index" => Ok(worker::squash_index().enqueue(&conn)?),
4040
other => Err(anyhow!("Unrecognized job type `{}`", other)),
4141
}
4242
}

src/controllers/krate/publish.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use crate::models::{
1313
insert_version_owner_action, Badge, Category, Crate, DependencyKind, Keyword, NewCrate,
1414
NewVersion, Rights, VersionAction,
1515
};
16+
use crate::worker;
1617

17-
use crate::render;
1818
use crate::schema::*;
1919
use crate::util::errors::{cargo_err, AppResult};
2020
use crate::util::{read_fill, read_le_u32, LimitErrorReader, Maximums};
@@ -197,7 +197,7 @@ pub fn publish(req: &mut dyn RequestExt) -> EndpointResult {
197197
verify_tarball(&pkg_name, &tarball, maximums.max_unpack_size)?;
198198

199199
if let Some(readme) = new_crate.readme {
200-
render::render_and_upload_readme(
200+
worker::render_and_upload_readme(
201201
version.id,
202202
readme,
203203
new_crate
@@ -223,7 +223,7 @@ pub fn publish(req: &mut dyn RequestExt) -> EndpointResult {
223223
yanked: Some(false),
224224
links,
225225
};
226-
git::add_crate(git_crate).enqueue(&conn)?;
226+
worker::add_crate(git_crate).enqueue(&conn)?;
227227

228228
// The `other` field on `PublishWarnings` was introduced to handle a temporary warning
229229
// that is no longer needed. As such, crates.io currently does not return any `other`

src/controllers/version/yank.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use swirl::Job;
44

55
use super::{extract_crate_name_and_semver, version_and_crate};
66
use crate::controllers::cargo_prelude::*;
7-
use crate::git;
87
use crate::models::Rights;
98
use crate::models::{insert_version_owner_action, VersionAction};
9+
use crate::worker;
1010

1111
/// Handles the `DELETE /crates/:crate_id/:version/yank` route.
1212
/// This does not delete a crate version, it makes the crate
@@ -50,7 +50,7 @@ fn modify_yank(req: &mut dyn RequestExt, yanked: bool) -> EndpointResult {
5050

5151
insert_version_owner_action(&conn, version.id, user.id, api_token_id, action)?;
5252

53-
git::yank(krate.name, version, yanked).enqueue(&conn)?;
53+
worker::yank(krate.name, version, yanked).enqueue(&conn)?;
5454

5555
ok_true()
5656
}

src/git.rs

Lines changed: 8 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
use std::collections::HashMap;
2-
use std::fs::{self, OpenOptions};
3-
use std::io::prelude::*;
42
use std::path::{Path, PathBuf};
53

6-
use chrono::Utc;
74
use swirl::PerformError;
85
use tempfile::TempDir;
96
use url::Url;
107

11-
use crate::background_jobs::Environment;
12-
use crate::models::{DependencyKind, Version};
13-
use crate::schema::versions;
8+
use crate::models::DependencyKind;
149

1510
static DEFAULT_GIT_SSH_USERNAME: &str = "git";
1611

@@ -142,9 +137,9 @@ impl RepositoryConfig {
142137
}
143138

144139
pub struct Repository {
145-
checkout_path: TempDir,
140+
pub checkout_path: TempDir,
146141
repository: git2::Repository,
147-
credentials: Credentials,
142+
pub credentials: Credentials,
148143
}
149144

150145
impl Repository {
@@ -179,13 +174,13 @@ impl Repository {
179174
})
180175
}
181176

182-
fn index_file(&self, name: &str) -> PathBuf {
177+
pub fn index_file(&self, name: &str) -> PathBuf {
183178
self.checkout_path
184179
.path()
185180
.join(self.relative_index_file(name))
186181
}
187182

188-
fn relative_index_file(&self, name: &str) -> PathBuf {
183+
pub fn relative_index_file(&self, name: &str) -> PathBuf {
189184
let name = name.to_lowercase();
190185
match name.len() {
191186
1 => Path::new("1").join(&name),
@@ -195,7 +190,7 @@ impl Repository {
195190
}
196191
}
197192

198-
fn head_oid(&self) -> Result<git2::Oid, PerformError> {
193+
pub fn head_oid(&self) -> Result<git2::Oid, PerformError> {
199194
Ok(self.repository.head()?.target().unwrap())
200195
}
201196

@@ -246,7 +241,7 @@ impl Repository {
246241
ref_status
247242
}
248243

249-
fn commit_and_push(&self, message: &str, modified_file: &Path) -> Result<(), PerformError> {
244+
pub fn commit_and_push(&self, message: &str, modified_file: &Path) -> Result<(), PerformError> {
250245
println!("Committing and pushing \"{}\"", message);
251246

252247
self.perform_commit_and_push(message, modified_file)
@@ -291,7 +286,7 @@ impl Repository {
291286
}
292287

293288
/// Reset `HEAD` to a single commit with all the index contents, but no parent
294-
fn squash_to_single_commit(&self, msg: &str) -> Result<(), PerformError> {
289+
pub fn squash_to_single_commit(&self, msg: &str) -> Result<(), PerformError> {
295290
let tree = self.repository.find_commit(self.head_oid()?)?.tree()?;
296291
let sig = self.repository.signature()?;
297292

@@ -308,154 +303,3 @@ impl Repository {
308303
Ok(())
309304
}
310305
}
311-
312-
#[swirl::background_job]
313-
pub fn add_crate(env: &Environment, krate: Crate) -> Result<(), PerformError> {
314-
use std::io::prelude::*;
315-
316-
let repo = env.lock_index()?;
317-
let dst = repo.index_file(&krate.name);
318-
319-
// Add the crate to its relevant file
320-
fs::create_dir_all(dst.parent().unwrap())?;
321-
let mut file = OpenOptions::new().append(true).create(true).open(&dst)?;
322-
serde_json::to_writer(&mut file, &krate)?;
323-
file.write_all(b"\n")?;
324-
325-
let message: String = format!("Updating crate `{}#{}`", krate.name, krate.vers);
326-
327-
repo.commit_and_push(&message, &repo.relative_index_file(&krate.name))
328-
}
329-
330-
/// Yanks or unyanks a crate version. This requires finding the index
331-
/// file, deserlialise the crate from JSON, change the yank boolean to
332-
/// `true` or `false`, write all the lines back out, and commit and
333-
/// push the changes.
334-
#[swirl::background_job]
335-
pub fn yank(
336-
conn: &PgConnection,
337-
env: &Environment,
338-
krate: String,
339-
version: Version,
340-
yanked: bool,
341-
) -> Result<(), PerformError> {
342-
use diesel::prelude::*;
343-
344-
let repo = env.lock_index()?;
345-
let dst = repo.index_file(&krate);
346-
347-
conn.transaction(|| {
348-
let yanked_in_db: bool = versions::table
349-
.find(version.id)
350-
.select(versions::yanked)
351-
.for_update()
352-
.first(&*conn)?;
353-
354-
if yanked_in_db == yanked {
355-
// The crate is alread in the state requested, nothing to do
356-
return Ok(());
357-
}
358-
359-
let prev = fs::read_to_string(&dst)?;
360-
let new = prev
361-
.lines()
362-
.map(|line| {
363-
let mut git_crate = serde_json::from_str::<Crate>(line)
364-
.map_err(|_| format!("couldn't decode: `{}`", line))?;
365-
if git_crate.name != krate || git_crate.vers != version.num {
366-
return Ok(line.to_string());
367-
}
368-
git_crate.yanked = Some(yanked);
369-
Ok(serde_json::to_string(&git_crate)?)
370-
})
371-
.collect::<Result<Vec<_>, PerformError>>();
372-
let new = new?.join("\n") + "\n";
373-
fs::write(&dst, new.as_bytes())?;
374-
375-
let message: String = format!(
376-
"{} crate `{}#{}`",
377-
if yanked { "Yanking" } else { "Unyanking" },
378-
krate,
379-
version.num
380-
);
381-
382-
repo.commit_and_push(&message, &repo.relative_index_file(&krate))?;
383-
384-
diesel::update(&version)
385-
.set(versions::yanked.eq(yanked))
386-
.execute(&*conn)?;
387-
388-
Ok(())
389-
})
390-
}
391-
392-
/// Collapse the index into a single commit, archiving the current history in a snapshot branch.
393-
#[swirl::background_job]
394-
pub fn squash_index(env: &Environment) -> Result<(), PerformError> {
395-
let repo = env.lock_index()?;
396-
println!("Squashing the index into a single commit.");
397-
398-
let now = Utc::now().format("%Y-%m-%d");
399-
let original_head = repo.head_oid()?.to_string();
400-
let msg = format!("Collapse index into one commit\n\n\
401-
Previous HEAD was {}, now on the `snapshot-{}` branch\n\n\
402-
More information about this change can be found [online] and on [this issue].\n\n\
403-
[online]: https://internals.rust-lang.org/t/cargos-crate-index-upcoming-squash-into-one-commit/8440\n\
404-
[this issue]: https://github.com/rust-lang/crates-io-cargo-teams/issues/47", original_head, now);
405-
406-
repo.squash_to_single_commit(&msg)?;
407-
408-
// Shell out to git because libgit2 does not currently support push leases
409-
410-
let key = match &repo.credentials {
411-
Credentials::Ssh { key } => key,
412-
Credentials::Http { .. } => {
413-
return Err(String::from("squash_index: Password auth not supported").into())
414-
}
415-
_ => return Err(String::from("squash_index: Could not determine credentials").into()),
416-
};
417-
418-
// When running on production, ensure the file is created in tmpfs and not persisted to disk
419-
#[cfg(target_os = "linux")]
420-
let mut temp_key_file = tempfile::Builder::new().tempfile_in("/dev/shm")?;
421-
422-
// For other platforms, default to std::env::tempdir()
423-
#[cfg(not(target_os = "linux"))]
424-
let mut temp_key_file = tempfile::Builder::new().tempfile()?;
425-
426-
temp_key_file.write_all(key.as_bytes())?;
427-
428-
let checkout_path = repo.checkout_path.path();
429-
let output = std::process::Command::new("git")
430-
.current_dir(checkout_path)
431-
.env(
432-
"GIT_SSH_COMMAND",
433-
format!(
434-
"ssh -o StrictHostKeyChecking=accept-new -i {}",
435-
temp_key_file.path().display()
436-
),
437-
)
438-
.args(&[
439-
"push",
440-
// Both updates should succeed or fail together
441-
"--atomic",
442-
"origin",
443-
// Overwrite master, but only if it server matches the expected value
444-
&format!("--force-with-lease=refs/heads/master:{}", original_head),
445-
// The new squashed commit is pushed to master
446-
"HEAD:refs/heads/master",
447-
// The previous value of HEAD is pushed to a snapshot branch
448-
&format!("{}:refs/heads/snapshot-{}", original_head, now),
449-
])
450-
.output()?;
451-
452-
if !output.status.success() {
453-
let stderr = String::from_utf8_lossy(&output.stderr);
454-
let message = format!("Running git command failed with: {}", stderr);
455-
return Err(message.into());
456-
}
457-
458-
println!("The index has been successfully squashed.");
459-
460-
Ok(())
461-
}

src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,12 @@ pub mod github;
4545
pub mod metrics;
4646
pub mod middleware;
4747
mod publish_rate_limit;
48-
pub mod render;
4948
pub mod schema;
5049
pub mod sql;
51-
pub mod tasks;
5250
mod test_util;
5351
pub mod uploaders;
5452
pub mod util;
53+
pub mod worker;
5554

5655
pub mod controllers;
5756
pub mod models;

src/tasks.rs

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/tests/dump_db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::util::FreshSchema;
2-
use cargo_registry::tasks::dump_db;
2+
use cargo_registry::worker::dump_db;
33

44
#[test]
55
fn dump_db_and_reimport_dump() {
File renamed without changes.
File renamed without changes.

src/tasks/dump_db/gen_scripts.rs renamed to src/worker/dump_db/gen_scripts.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{fs::File, path::Path};
22

3-
use crate::tasks::dump_db::configuration::{ColumnVisibility, TableConfig, VisibilityConfig};
3+
use crate::worker::dump_db::configuration::{ColumnVisibility, TableConfig, VisibilityConfig};
44
use swirl::PerformError;
55

66
pub fn gen_scripts(export_script: &Path, import_script: &Path) -> Result<(), PerformError> {

0 commit comments

Comments
 (0)