Skip to content

Commit 1af5ce3

Browse files
authored
Merge pull request #8442 from Turbo87/bye-bye-r2d2
Remove `r2d2` database connection pools
2 parents a5c5065 + a0382fe commit 1af5ce3

File tree

5 files changed

+5
-272
lines changed

5 files changed

+5
-272
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ deadpool-diesel = { version = "=0.6.0", features = ["postgres", "tracing"] }
7272
derive_builder = "=0.20.0"
7373
derive_deref = "=1.1.1"
7474
dialoguer = "=0.11.0"
75-
diesel = { version = "=2.1.5", features = ["postgres", "serde_json", "chrono", "r2d2", "numeric"] }
75+
diesel = { version = "=2.1.5", features = ["postgres", "serde_json", "chrono", "numeric"] }
7676
diesel_full_text_search = "=2.1.1"
7777
diesel_migrations = { version = "=2.1.0", features = ["postgres"] }
7878
dotenvy = "=0.15.7"
@@ -126,6 +126,7 @@ crates_io_index = { path = "crates/crates_io_index", features = ["testing"] }
126126
crates_io_tarball = { path = "crates/crates_io_tarball", features = ["builder"] }
127127
crates_io_test_db = { path = "crates/crates_io_test_db" }
128128
claims = "=0.7.1"
129+
diesel = { version = "=2.1.5", features = ["r2d2"] }
129130
googletest = "=0.11.0"
130131
insta = { version = "=1.38.0", features = ["json", "redactions"] }
131132
regex = "=1.10.4"

src/app.rs

Lines changed: 1 addition & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Application-wide components in a struct accessible from each request
22
33
use crate::config;
4-
use crate::db::{connection_url, ConnectionConfig, DieselPool, DieselPooledConn, PoolError};
4+
use crate::db::{connection_url, ConnectionConfig};
55
use std::ops::Deref;
66
use std::sync::atomic::AtomicUsize;
77
use std::sync::Arc;
@@ -14,25 +14,17 @@ use axum::extract::{FromRef, FromRequestParts, State};
1414
use crates_io_github::GitHubClient;
1515
use deadpool_diesel::postgres::{Manager as DeadpoolManager, Pool as DeadpoolPool};
1616
use deadpool_diesel::Runtime;
17-
use diesel::r2d2;
1817
use oauth2::basic::BasicClient;
19-
use scheduled_thread_pool::ScheduledThreadPool;
2018

2119
type DeadpoolResult = Result<deadpool_diesel::postgres::Connection, deadpool_diesel::PoolError>;
2220

2321
/// The `App` struct holds the main components of the application like
2422
/// the database connection pool and configurations
2523
pub struct App {
26-
/// The primary database connection pool
27-
pub primary_database: DieselPool,
28-
2924
/// Async database connection pool based on `deadpool` connected
3025
/// to the primary database
3126
pub deadpool_primary: DeadpoolPool,
3227

33-
/// The read-only replica database connection pool
34-
pub read_only_replica_database: Option<DieselPool>,
35-
3628
/// Async database connection pool based on `deadpool` connected
3729
/// to the read-only replica database
3830
pub deadpool_replica: Option<DeadpoolPool>,
@@ -88,32 +80,6 @@ impl App {
8880
),
8981
);
9082

91-
let thread_pool = Arc::new(ScheduledThreadPool::new(config.db.helper_threads));
92-
93-
let primary_database = {
94-
let primary_db_connection_config = ConnectionConfig {
95-
statement_timeout: config.db.statement_timeout,
96-
read_only: config.db.primary.read_only_mode,
97-
};
98-
99-
let primary_db_config = r2d2::Pool::builder()
100-
.max_size(config.db.primary.pool_size)
101-
.min_idle(config.db.primary.min_idle)
102-
.connection_timeout(config.db.connection_timeout)
103-
.connection_customizer(Box::new(primary_db_connection_config))
104-
.thread_pool(thread_pool.clone());
105-
106-
DieselPool::new(
107-
&config.db.primary.url,
108-
&config.db,
109-
primary_db_config,
110-
instance_metrics
111-
.database_time_to_obtain_connection
112-
.with_label_values(&["primary"]),
113-
)
114-
.unwrap()
115-
};
116-
11783
let primary_database_async = {
11884
use secrecy::ExposeSecret;
11985

@@ -134,34 +100,6 @@ impl App {
134100
.unwrap()
135101
};
136102

137-
let replica_database = if let Some(pool_config) = config.db.replica.as_ref() {
138-
let replica_db_connection_config = ConnectionConfig {
139-
statement_timeout: config.db.statement_timeout,
140-
read_only: pool_config.read_only_mode,
141-
};
142-
143-
let replica_db_config = r2d2::Pool::builder()
144-
.max_size(pool_config.pool_size)
145-
.min_idle(pool_config.min_idle)
146-
.connection_timeout(config.db.connection_timeout)
147-
.connection_customizer(Box::new(replica_db_connection_config))
148-
.thread_pool(thread_pool);
149-
150-
Some(
151-
DieselPool::new(
152-
&pool_config.url,
153-
&config.db,
154-
replica_db_config,
155-
instance_metrics
156-
.database_time_to_obtain_connection
157-
.with_label_values(&["follower"]),
158-
)
159-
.unwrap(),
160-
)
161-
} else {
162-
None
163-
};
164-
165103
let replica_database_async = if let Some(pool_config) = config.db.replica.as_ref() {
166104
use secrecy::ExposeSecret;
167105

@@ -187,9 +125,7 @@ impl App {
187125
};
188126

189127
App {
190-
primary_database,
191128
deadpool_primary: primary_database_async,
192-
read_only_replica_database: replica_database,
193129
deadpool_replica: replica_database_async,
194130
github,
195131
github_oauth,
@@ -208,48 +144,12 @@ impl App {
208144
&self.config.session_key
209145
}
210146

211-
/// Obtain a read/write database connection from the primary pool
212-
#[instrument(skip_all)]
213-
pub fn db_write(&self) -> Result<DieselPooledConn, PoolError> {
214-
self.primary_database.get()
215-
}
216-
217147
/// Obtain a read/write database connection from the async primary pool
218148
#[instrument(skip_all)]
219149
pub async fn db_write_async(&self) -> DeadpoolResult {
220150
self.deadpool_primary.get().await
221151
}
222152

223-
/// Obtain a readonly database connection from the replica pool
224-
///
225-
/// If the replica pool is disabled or unavailable, the primary pool is used instead.
226-
#[instrument(skip_all)]
227-
pub fn db_read(&self) -> Result<DieselPooledConn, PoolError> {
228-
let Some(read_only_pool) = self.read_only_replica_database.as_ref() else {
229-
// Replica is disabled, but primary might be available
230-
return self.primary_database.get();
231-
};
232-
233-
match read_only_pool.get() {
234-
// Replica is available
235-
Ok(connection) => Ok(connection),
236-
237-
// Replica is not available, but primary might be available
238-
Err(PoolError::UnhealthyPool) => {
239-
let _ = self
240-
.instance_metrics
241-
.database_fallback_used
242-
.get_metric_with_label_values(&["follower"])
243-
.map(|metric| metric.inc());
244-
245-
self.primary_database.get()
246-
}
247-
248-
// Replica failed
249-
Err(error) => Err(error),
250-
}
251-
}
252-
253153
/// Obtain a readonly database connection from the replica pool
254154
///
255155
/// If the replica pool is disabled or unavailable, the primary pool is used instead.
@@ -281,35 +181,6 @@ impl App {
281181
}
282182
}
283183

284-
/// Obtain a readonly database connection from the primary pool
285-
///
286-
/// If the primary pool is unavailable, the replica pool is used instead, if not disabled.
287-
#[instrument(skip_all)]
288-
pub fn db_read_prefer_primary(&self) -> Result<DieselPooledConn, PoolError> {
289-
let Some(read_only_pool) = self.read_only_replica_database.as_ref() else {
290-
return self.primary_database.get();
291-
};
292-
293-
match self.primary_database.get() {
294-
// Primary is available
295-
Ok(connection) => Ok(connection),
296-
297-
// Primary is not available, but replica might be available
298-
Err(PoolError::UnhealthyPool) => {
299-
let _ = self
300-
.instance_metrics
301-
.database_fallback_used
302-
.get_metric_with_label_values(&["primary"])
303-
.map(|metric| metric.inc());
304-
305-
read_only_pool.get()
306-
}
307-
308-
// Primary failed
309-
Err(error) => Err(error),
310-
}
311-
}
312-
313184
/// Obtain a readonly database connection from the primary pool
314185
///
315186
/// If the primary pool is unavailable, the replica pool is used instead, if not disabled.

src/db.rs

Lines changed: 1 addition & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,110 +1,13 @@
11
use deadpool_diesel::postgres::{Hook, HookError};
22
use diesel::prelude::*;
3-
use diesel::r2d2::{self, ConnectionManager, CustomizeConnection, State};
4-
use prometheus::Histogram;
5-
use secrecy::{ExposeSecret, SecretString};
6-
use std::ops::Deref;
3+
use secrecy::ExposeSecret;
74
use std::time::Duration;
8-
use thiserror::Error;
95
use url::Url;
106

117
use crate::config;
128

139
pub mod sql_types;
1410

15-
pub type ConnectionPool = r2d2::Pool<ConnectionManager<PgConnection>>;
16-
17-
#[derive(Clone)]
18-
pub struct DieselPool {
19-
pool: ConnectionPool,
20-
time_to_obtain_connection_metric: Option<Histogram>,
21-
}
22-
23-
impl DieselPool {
24-
pub(crate) fn new(
25-
url: &SecretString,
26-
config: &config::DatabasePools,
27-
r2d2_config: r2d2::Builder<ConnectionManager<PgConnection>>,
28-
time_to_obtain_connection_metric: Histogram,
29-
) -> Result<DieselPool, PoolError> {
30-
let manager = ConnectionManager::new(connection_url(config, url.expose_secret()));
31-
32-
// For crates.io we want the behavior of creating a database pool to be slightly different
33-
// than the defaults of R2D2: the library's build() method assumes its consumers always
34-
// need a database connection to operate, so it blocks creating a pool until a minimum
35-
// number of connections is available.
36-
//
37-
// crates.io can actually operate in a limited capacity without a database connections,
38-
// especially by serving download requests to our users. Because of that we don't want to
39-
// block indefinitely waiting for a connection: we instead need to wait for a bit (to avoid
40-
// serving errors for the first connections until the pool is initialized) and if we can't
41-
// establish any connection continue booting up the application. The database pool will
42-
// automatically be marked as unhealthy and the rest of the application will adapt.
43-
let pool = DieselPool {
44-
pool: r2d2_config.build_unchecked(manager),
45-
time_to_obtain_connection_metric: Some(time_to_obtain_connection_metric),
46-
};
47-
match pool.wait_until_healthy(Duration::from_secs(5)) {
48-
Ok(()) => {}
49-
Err(PoolError::UnhealthyPool) => {}
50-
Err(err) => return Err(err),
51-
}
52-
53-
Ok(pool)
54-
}
55-
56-
pub fn new_background_worker(pool: r2d2::Pool<ConnectionManager<PgConnection>>) -> Self {
57-
Self {
58-
pool,
59-
time_to_obtain_connection_metric: None,
60-
}
61-
}
62-
63-
#[instrument(name = "db.connect", skip_all)]
64-
pub fn get(&self) -> Result<DieselPooledConn, PoolError> {
65-
match self.time_to_obtain_connection_metric.as_ref() {
66-
Some(time_to_obtain_connection_metric) => time_to_obtain_connection_metric
67-
.observe_closure_duration(|| {
68-
if let Some(conn) = self.pool.try_get() {
69-
Ok(conn)
70-
} else if !self.is_healthy() {
71-
Err(PoolError::UnhealthyPool)
72-
} else {
73-
Ok(self.pool.get()?)
74-
}
75-
}),
76-
None => Ok(self.pool.get()?),
77-
}
78-
}
79-
80-
pub fn state(&self) -> State {
81-
self.pool.state()
82-
}
83-
84-
#[instrument(skip_all)]
85-
pub fn wait_until_healthy(&self, timeout: Duration) -> Result<(), PoolError> {
86-
match self.pool.get_timeout(timeout) {
87-
Ok(_) => Ok(()),
88-
Err(_) if !self.is_healthy() => Err(PoolError::UnhealthyPool),
89-
Err(err) => Err(PoolError::R2D2(err)),
90-
}
91-
}
92-
93-
fn is_healthy(&self) -> bool {
94-
self.state().connections > 0
95-
}
96-
}
97-
98-
impl Deref for DieselPool {
99-
type Target = ConnectionPool;
100-
101-
fn deref(&self) -> &Self::Target {
102-
&self.pool
103-
}
104-
}
105-
106-
pub type DieselPooledConn = r2d2::PooledConnection<ConnectionManager<PgConnection>>;
107-
10811
pub fn oneoff_connection_with_config(
10912
config: &config::DatabasePools,
11013
) -> ConnectionResult<PgConnection> {
@@ -160,12 +63,6 @@ impl ConnectionConfig {
16063
}
16164
}
16265

163-
impl CustomizeConnection<PgConnection, r2d2::Error> for ConnectionConfig {
164-
fn on_acquire(&self, conn: &mut PgConnection) -> Result<(), r2d2::Error> {
165-
self.apply(conn).map_err(r2d2::Error::QueryError)
166-
}
167-
}
168-
16966
impl From<ConnectionConfig> for Hook {
17067
fn from(config: ConnectionConfig) -> Self {
17168
Hook::async_fn(move |conn, _| {
@@ -178,11 +75,3 @@ impl From<ConnectionConfig> for Hook {
17875
})
17976
}
18077
}
181-
182-
#[derive(Debug, Error)]
183-
pub enum PoolError {
184-
#[error(transparent)]
185-
R2D2(#[from] r2d2::PoolError),
186-
#[error("unhealthy database pool")]
187-
UnhealthyPool,
188-
}

src/metrics/instance.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
//! As a rule of thumb, if the metric requires a database query to be updated it's probably a
1818
//! service-level metric, and you should add it to `src/metrics/service.rs` instead.
1919
20+
use crate::app::App;
2021
use crate::metrics::macros::metrics;
21-
use crate::{app::App, db::DieselPool};
2222
use deadpool_diesel::postgres::Pool;
2323
use prometheus::{
2424
proto::MetricFamily, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
@@ -53,11 +53,6 @@ metrics! {
5353
impl InstanceMetrics {
5454
pub fn gather(&self, app: &App) -> prometheus::Result<Vec<MetricFamily>> {
5555
// Database pool stats
56-
self.refresh_pool_stats("primary", &app.primary_database)?;
57-
if let Some(follower) = &app.read_only_replica_database {
58-
self.refresh_pool_stats("follower", follower)?;
59-
}
60-
6156
self.refresh_async_pool_stats("async_primary", &app.deadpool_primary)?;
6257
if let Some(follower) = &app.deadpool_replica {
6358
self.refresh_async_pool_stats("async_follower", follower)?;
@@ -66,19 +61,6 @@ impl InstanceMetrics {
6661
Ok(self.registry.gather())
6762
}
6863

69-
fn refresh_pool_stats(&self, name: &str, pool: &DieselPool) -> prometheus::Result<()> {
70-
let state = pool.state();
71-
72-
self.database_idle_conns
73-
.get_metric_with_label_values(&[name])?
74-
.set(state.idle_connections as i64);
75-
self.database_used_conns
76-
.get_metric_with_label_values(&[name])?
77-
.set((state.connections - state.idle_connections) as i64);
78-
79-
Ok(())
80-
}
81-
8264
fn refresh_async_pool_stats(&self, name: &str, pool: &Pool) -> prometheus::Result<()> {
8365
let status = pool.status();
8466

0 commit comments

Comments
 (0)