Skip to content

Commit e0bb004

Browse files
authored
Merge pull request #9161 from Turbo87/diesel-async
Use `diesel-async` database pools
2 parents 5b6cca9 + 4795d51 commit e0bb004

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+790
-491
lines changed

Cargo.lock

Lines changed: 122 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ derive_builder = "=0.20.0"
7373
derive_deref = "=1.1.1"
7474
dialoguer = "=0.11.0"
7575
diesel = { version = "=2.2.2", features = ["postgres", "serde_json", "chrono", "numeric"] }
76+
diesel-async = { version = "=0.5.0", features = ["async-connection-wrapper", "deadpool", "postgres"] }
7677
diesel_full_text_search = "=2.2.0"
7778
diesel_migrations = { version = "=2.2.0", features = ["postgres"] }
7879
dotenvy = "=0.15.7"

crates/crates_io_worker/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ workspace = true
1111
anyhow = "=1.0.86"
1212
deadpool-diesel = { version = "=0.6.1", features = ["postgres", "tracing"] }
1313
diesel = { version = "=2.2.2", features = ["postgres", "serde_json"] }
14+
diesel-async = { version = "=0.5.0", features = ["async-connection-wrapper", "deadpool", "postgres"] }
1415
futures-util = "=0.3.30"
1516
sentry-core = { version = "=0.34.0", features = ["client"] }
1617
serde = { version = "=1.0.204", features = ["derive"] }
@@ -21,4 +22,4 @@ tracing = "=0.1.40"
2122

2223
[dev-dependencies]
2324
crates_io_test_db = { path = "../crates_io_test_db" }
24-
tokio = { version = "=1.39.2", features = ["macros", "rt", "sync"]}
25+
tokio = { version = "=1.39.2", features = ["macros", "rt", "rt-multi-thread", "sync"]}

crates/crates_io_worker/src/background_job.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::errors::EnqueueError;
22
use crate::schema::background_jobs;
3+
use diesel::connection::LoadConnection;
4+
use diesel::pg::Pg;
35
use diesel::prelude::*;
4-
use diesel::PgConnection;
56
use serde::de::DeserializeOwned;
67
use serde::Serialize;
78
use std::future::Future;
@@ -29,14 +30,14 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
2930
/// Execute the task. This method should define its logic.
3031
fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send;
3132

32-
fn enqueue(&self, conn: &mut PgConnection) -> Result<i64, EnqueueError> {
33+
fn enqueue(&self, conn: &mut impl LoadConnection<Backend = Pg>) -> Result<i64, EnqueueError> {
3334
self.enqueue_with_priority(conn, Self::PRIORITY)
3435
}
3536

3637
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]
3738
fn enqueue_with_priority(
3839
&self,
39-
conn: &mut PgConnection,
40+
conn: &mut impl LoadConnection<Backend = Pg>,
4041
job_priority: i16,
4142
) -> Result<i64, EnqueueError> {
4243
let job_data = serde_json::to_value(self)?;

crates/crates_io_worker/src/runner.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,28 @@ use crate::job_registry::JobRegistry;
33
use crate::worker::Worker;
44
use crate::{storage, BackgroundJob};
55
use anyhow::anyhow;
6-
use deadpool_diesel::postgres::Pool;
6+
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
7+
use diesel_async::pooled_connection::deadpool::Pool;
8+
use diesel_async::AsyncPgConnection;
79
use futures_util::future::join_all;
810
use std::collections::HashMap;
911
use std::sync::Arc;
1012
use std::time::Duration;
11-
use tokio::task::JoinHandle;
13+
use tokio::task::{spawn_blocking, JoinHandle};
1214
use tracing::{info, info_span, warn, Instrument};
1315

1416
const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(1);
1517

1618
/// The core runner responsible for locking and running jobs
1719
pub struct Runner<Context> {
18-
connection_pool: Pool,
20+
connection_pool: Pool<AsyncPgConnection>,
1921
queues: HashMap<String, Queue<Context>>,
2022
context: Context,
2123
shutdown_when_queue_empty: bool,
2224
}
2325

2426
impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
25-
pub fn new(connection_pool: Pool, context: Context) -> Self {
27+
pub fn new(connection_pool: Pool<AsyncPgConnection>, context: Context) -> Self {
2628
Self {
2729
connection_pool,
2830
queues: HashMap::new(),
@@ -96,7 +98,9 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
9698
/// any jobs have failed.
9799
pub async fn check_for_failed_jobs(&self) -> anyhow::Result<()> {
98100
let conn = self.connection_pool.get().await?;
99-
conn.interact(move |conn| {
101+
spawn_blocking(move || {
102+
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
103+
100104
let failed_jobs = storage::failed_job_count(conn)?;
101105
if failed_jobs == 0 {
102106
Ok(())

crates/crates_io_worker/src/storage.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::schema::background_jobs;
2+
use diesel::connection::LoadConnection;
23
use diesel::dsl::now;
34
use diesel::pg::Pg;
45
use diesel::prelude::*;
@@ -26,7 +27,7 @@ fn retriable() -> Box<dyn BoxableExpression<background_jobs::table, Pg, SqlType
2627
/// Finds the next job that is unlocked, and ready to be retried. If a row is
2728
/// found, it will be locked.
2829
pub(super) fn find_next_unlocked_job(
29-
conn: &mut PgConnection,
30+
conn: &mut impl LoadConnection<Backend = Pg>,
3031
job_types: &[String],
3132
) -> QueryResult<BackgroundJob> {
3233
background_jobs::table
@@ -40,15 +41,18 @@ pub(super) fn find_next_unlocked_job(
4041
}
4142

4243
/// The number of jobs that have failed at least once
43-
pub(super) fn failed_job_count(conn: &mut PgConnection) -> QueryResult<i64> {
44+
pub(super) fn failed_job_count(conn: &mut impl LoadConnection<Backend = Pg>) -> QueryResult<i64> {
4445
background_jobs::table
4546
.count()
4647
.filter(background_jobs::retries.gt(0))
4748
.get_result(conn)
4849
}
4950

5051
/// Deletes a job that has successfully completed running
51-
pub(super) fn delete_successful_job(conn: &mut PgConnection, job_id: i64) -> QueryResult<()> {
52+
pub(super) fn delete_successful_job(
53+
conn: &mut impl LoadConnection<Backend = Pg>,
54+
job_id: i64,
55+
) -> QueryResult<()> {
5256
delete(background_jobs::table.find(job_id)).execute(conn)?;
5357
Ok(())
5458
}
@@ -57,7 +61,7 @@ pub(super) fn delete_successful_job(conn: &mut PgConnection, job_id: i64) -> Que
5761
///
5862
/// Ignores any database errors that may have occurred. If the DB has gone away,
5963
/// we assume that just trying again with a new connection will succeed.
60-
pub(super) fn update_failed_job(conn: &mut PgConnection, job_id: i64) {
64+
pub(super) fn update_failed_job(conn: &mut impl LoadConnection<Backend = Pg>, job_id: i64) {
6165
let _ = update(background_jobs::table.find(job_id))
6266
.set((
6367
background_jobs::retries.eq(background_jobs::retries + 1),

crates/crates_io_worker/src/worker.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,22 @@ use crate::job_registry::JobRegistry;
22
use crate::storage;
33
use crate::util::{try_to_extract_panic_info, with_sentry_transaction};
44
use anyhow::anyhow;
5-
use deadpool_diesel::postgres::Pool;
65
use diesel::prelude::*;
6+
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
7+
use diesel_async::pooled_connection::deadpool::Pool;
8+
use diesel_async::AsyncPgConnection;
79
use futures_util::FutureExt;
810
use sentry_core::{Hub, SentryFutureExt};
911
use std::panic::AssertUnwindSafe;
1012
use std::sync::Arc;
1113
use std::time::Duration;
1214
use tokio::runtime::Handle;
15+
use tokio::task::spawn_blocking;
1316
use tokio::time::sleep;
1417
use tracing::{debug, error, info_span, warn};
1518

1619
pub struct Worker<Context> {
17-
pub(crate) connection_pool: Pool,
20+
pub(crate) connection_pool: Pool<AsyncPgConnection>,
1821
pub(crate) context: Context,
1922
pub(crate) job_registry: Arc<JobRegistry<Context>>,
2023
pub(crate) shutdown_when_queue_empty: bool,
@@ -58,7 +61,9 @@ impl<Context: Clone + Send + Sync + 'static> Worker<Context> {
5861
let job_registry = self.job_registry.clone();
5962
let conn = self.connection_pool.get().await?;
6063

61-
conn.interact(move |conn| {
64+
spawn_blocking(move || {
65+
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
66+
6267
let job_types = job_registry.job_types();
6368
conn.transaction(|conn| {
6469
debug!("Looking for next background worker job…");

0 commit comments

Comments
 (0)