1
1
use deadpool_diesel:: postgres:: { Hook , HookError } ;
2
2
use diesel:: prelude:: * ;
3
- use diesel:: r2d2:: { self , ConnectionManager , CustomizeConnection , State } ;
4
- use prometheus:: Histogram ;
5
- use secrecy:: { ExposeSecret , SecretString } ;
6
- use std:: ops:: Deref ;
3
+ use secrecy:: ExposeSecret ;
7
4
use std:: time:: Duration ;
8
- use thiserror:: Error ;
9
5
use url:: Url ;
10
6
11
7
use crate :: config;
12
8
13
9
pub mod sql_types;
14
10
15
- pub type ConnectionPool = r2d2:: Pool < ConnectionManager < PgConnection > > ;
16
-
17
- #[ derive( Clone ) ]
18
- pub struct DieselPool {
19
- pool : ConnectionPool ,
20
- time_to_obtain_connection_metric : Option < Histogram > ,
21
- }
22
-
23
- impl DieselPool {
24
- pub ( crate ) fn new (
25
- url : & SecretString ,
26
- config : & config:: DatabasePools ,
27
- r2d2_config : r2d2:: Builder < ConnectionManager < PgConnection > > ,
28
- time_to_obtain_connection_metric : Histogram ,
29
- ) -> Result < DieselPool , PoolError > {
30
- let manager = ConnectionManager :: new ( connection_url ( config, url. expose_secret ( ) ) ) ;
31
-
32
- // For crates.io we want the behavior of creating a database pool to be slightly different
33
- // than the defaults of R2D2: the library's build() method assumes its consumers always
34
- // need a database connection to operate, so it blocks creating a pool until a minimum
35
- // number of connections is available.
36
- //
37
- // crates.io can actually operate in a limited capacity without a database connections,
38
- // especially by serving download requests to our users. Because of that we don't want to
39
- // block indefinitely waiting for a connection: we instead need to wait for a bit (to avoid
40
- // serving errors for the first connections until the pool is initialized) and if we can't
41
- // establish any connection continue booting up the application. The database pool will
42
- // automatically be marked as unhealthy and the rest of the application will adapt.
43
- let pool = DieselPool {
44
- pool : r2d2_config. build_unchecked ( manager) ,
45
- time_to_obtain_connection_metric : Some ( time_to_obtain_connection_metric) ,
46
- } ;
47
- match pool. wait_until_healthy ( Duration :: from_secs ( 5 ) ) {
48
- Ok ( ( ) ) => { }
49
- Err ( PoolError :: UnhealthyPool ) => { }
50
- Err ( err) => return Err ( err) ,
51
- }
52
-
53
- Ok ( pool)
54
- }
55
-
56
- pub fn new_background_worker ( pool : r2d2:: Pool < ConnectionManager < PgConnection > > ) -> Self {
57
- Self {
58
- pool,
59
- time_to_obtain_connection_metric : None ,
60
- }
61
- }
62
-
63
- #[ instrument( name = "db.connect" , skip_all) ]
64
- pub fn get ( & self ) -> Result < DieselPooledConn , PoolError > {
65
- match self . time_to_obtain_connection_metric . as_ref ( ) {
66
- Some ( time_to_obtain_connection_metric) => time_to_obtain_connection_metric
67
- . observe_closure_duration ( || {
68
- if let Some ( conn) = self . pool . try_get ( ) {
69
- Ok ( conn)
70
- } else if !self . is_healthy ( ) {
71
- Err ( PoolError :: UnhealthyPool )
72
- } else {
73
- Ok ( self . pool . get ( ) ?)
74
- }
75
- } ) ,
76
- None => Ok ( self . pool . get ( ) ?) ,
77
- }
78
- }
79
-
80
- pub fn state ( & self ) -> State {
81
- self . pool . state ( )
82
- }
83
-
84
- #[ instrument( skip_all) ]
85
- pub fn wait_until_healthy ( & self , timeout : Duration ) -> Result < ( ) , PoolError > {
86
- match self . pool . get_timeout ( timeout) {
87
- Ok ( _) => Ok ( ( ) ) ,
88
- Err ( _) if !self . is_healthy ( ) => Err ( PoolError :: UnhealthyPool ) ,
89
- Err ( err) => Err ( PoolError :: R2D2 ( err) ) ,
90
- }
91
- }
92
-
93
- fn is_healthy ( & self ) -> bool {
94
- self . state ( ) . connections > 0
95
- }
96
- }
97
-
98
- impl Deref for DieselPool {
99
- type Target = ConnectionPool ;
100
-
101
- fn deref ( & self ) -> & Self :: Target {
102
- & self . pool
103
- }
104
- }
105
-
106
- pub type DieselPooledConn = r2d2:: PooledConnection < ConnectionManager < PgConnection > > ;
107
-
108
11
pub fn oneoff_connection_with_config (
109
12
config : & config:: DatabasePools ,
110
13
) -> ConnectionResult < PgConnection > {
@@ -160,12 +63,6 @@ impl ConnectionConfig {
160
63
}
161
64
}
162
65
163
- impl CustomizeConnection < PgConnection , r2d2:: Error > for ConnectionConfig {
164
- fn on_acquire ( & self , conn : & mut PgConnection ) -> Result < ( ) , r2d2:: Error > {
165
- self . apply ( conn) . map_err ( r2d2:: Error :: QueryError )
166
- }
167
- }
168
-
169
66
impl From < ConnectionConfig > for Hook {
170
67
fn from ( config : ConnectionConfig ) -> Self {
171
68
Hook :: async_fn ( move |conn, _| {
@@ -178,11 +75,3 @@ impl From<ConnectionConfig> for Hook {
178
75
} )
179
76
}
180
77
}
181
-
182
- #[ derive( Debug , Error ) ]
183
- pub enum PoolError {
184
- #[ error( transparent) ]
185
- R2D2 ( #[ from] r2d2:: PoolError ) ,
186
- #[ error( "unhealthy database pool" ) ]
187
- UnhealthyPool ,
188
- }
0 commit comments