1
1
use crate :: stmt_cache:: { PrepareCallback , StmtCache } ;
2
2
use crate :: { AnsiTransactionManager , AsyncConnection , SimpleAsyncConnection } ;
3
3
use diesel:: connection:: statement_cache:: { MaybeCached , StatementCacheKey } ;
4
+ use diesel:: connection:: Instrumentation ;
5
+ use diesel:: connection:: InstrumentationEvent ;
6
+ use diesel:: connection:: StrQueryHelper ;
4
7
use diesel:: mysql:: { Mysql , MysqlQueryBuilder , MysqlType } ;
5
8
use diesel:: query_builder:: QueryBuilder ;
6
9
use diesel:: query_builder:: { bind_collector:: RawBytesBindCollector , QueryFragment , QueryId } ;
@@ -26,12 +29,28 @@ pub struct AsyncMysqlConnection {
26
29
conn : mysql_async:: Conn ,
27
30
stmt_cache : StmtCache < Mysql , Statement > ,
28
31
transaction_manager : AnsiTransactionManager ,
32
+ instrumentation : std:: sync:: Mutex < Option < Box < dyn Instrumentation > > > ,
29
33
}
30
34
31
35
#[ async_trait:: async_trait]
32
36
impl SimpleAsyncConnection for AsyncMysqlConnection {
33
37
async fn batch_execute ( & mut self , query : & str ) -> diesel:: QueryResult < ( ) > {
34
- Ok ( self . conn . query_drop ( query) . await . map_err ( ErrorHelper ) ?)
38
+ self . instrumentation ( )
39
+ . on_connection_event ( InstrumentationEvent :: start_query ( & StrQueryHelper :: new (
40
+ query,
41
+ ) ) ) ;
42
+ let result = self
43
+ . conn
44
+ . query_drop ( query)
45
+ . await
46
+ . map_err ( ErrorHelper )
47
+ . map_err ( Into :: into) ;
48
+ self . instrumentation ( )
49
+ . on_connection_event ( InstrumentationEvent :: finish_query (
50
+ & StrQueryHelper :: new ( query) ,
51
+ result. as_ref ( ) . err ( ) ,
52
+ ) ) ;
53
+ result
35
54
}
36
55
}
37
56
@@ -53,20 +72,18 @@ impl AsyncConnection for AsyncMysqlConnection {
53
72
type TransactionManager = AnsiTransactionManager ;
54
73
55
74
async fn establish ( database_url : & str ) -> diesel:: ConnectionResult < Self > {
56
- let opts = Opts :: from_url ( database_url)
57
- . map_err ( |e| diesel:: result:: ConnectionError :: InvalidConnectionUrl ( e. to_string ( ) ) ) ?;
58
- let builder = OptsBuilder :: from_opts ( opts)
59
- . init ( CONNECTION_SETUP_QUERIES . to_vec ( ) )
60
- . stmt_cache_size ( 0 ) // We have our own cache
61
- . client_found_rows ( true ) ; // This allows a consistent behavior between MariaDB/MySQL and PostgreSQL (and is already set in `diesel`)
62
-
63
- let conn = mysql_async:: Conn :: new ( builder) . await . map_err ( ErrorHelper ) ?;
64
-
65
- Ok ( AsyncMysqlConnection {
66
- conn,
67
- stmt_cache : StmtCache :: new ( ) ,
68
- transaction_manager : AnsiTransactionManager :: default ( ) ,
69
- } )
75
+ let mut instrumentation = diesel:: connection:: get_default_instrumentation ( ) ;
76
+ instrumentation. on_connection_event ( InstrumentationEvent :: start_establish_connection (
77
+ database_url,
78
+ ) ) ;
79
+ let r = Self :: establish_connection_inner ( database_url) . await ;
80
+ instrumentation. on_connection_event ( InstrumentationEvent :: finish_establish_connection (
81
+ database_url,
82
+ r. as_ref ( ) . err ( ) ,
83
+ ) ) ;
84
+ let mut conn = r?;
85
+ conn. instrumentation = std:: sync:: Mutex :: new ( instrumentation) ;
86
+ Ok ( conn)
70
87
}
71
88
72
89
fn load < ' conn , ' query , T > ( & ' conn mut self , source : T ) -> Self :: LoadFuture < ' conn , ' query >
@@ -80,7 +97,10 @@ impl AsyncConnection for AsyncMysqlConnection {
80
97
let stmt_for_exec = match stmt {
81
98
MaybeCached :: Cached ( ref s) => ( * s) . clone ( ) ,
82
99
MaybeCached :: CannotCache ( ref s) => s. clone ( ) ,
83
- _ => todo ! ( ) ,
100
+ _ => unreachable ! (
101
+ "Diesel has only two variants here at the time of writing.\n \
102
+ If you ever see this error message please open in issue in the diesel-async issue tracker"
103
+ ) ,
84
104
} ;
85
105
86
106
let ( tx, rx) = futures_channel:: mpsc:: channel ( 0 ) ;
@@ -152,6 +172,19 @@ impl AsyncConnection for AsyncMysqlConnection {
152
172
fn transaction_state ( & mut self ) -> & mut AnsiTransactionManager {
153
173
& mut self . transaction_manager
154
174
}
175
+
176
+ fn instrumentation ( & mut self ) -> & mut dyn Instrumentation {
177
+ self . instrumentation
178
+ . get_mut ( )
179
+ . unwrap_or_else ( |p| p. into_inner ( ) )
180
+ }
181
+
182
+ fn set_instrumentation ( & mut self , instrumentation : impl Instrumentation ) {
183
+ * self
184
+ . instrumentation
185
+ . get_mut ( )
186
+ . unwrap_or_else ( |p| p. into_inner ( ) ) = Some ( Box :: new ( instrumentation) ) ;
187
+ }
155
188
}
156
189
157
190
#[ inline( always) ]
@@ -195,6 +228,9 @@ impl AsyncMysqlConnection {
195
228
conn,
196
229
stmt_cache : StmtCache :: new ( ) ,
197
230
transaction_manager : AnsiTransactionManager :: default ( ) ,
231
+ instrumentation : std:: sync:: Mutex :: new (
232
+ diesel:: connection:: get_default_instrumentation ( ) ,
233
+ ) ,
198
234
} ;
199
235
200
236
for stmt in CONNECTION_SETUP_QUERIES {
@@ -219,6 +255,10 @@ impl AsyncMysqlConnection {
219
255
T : QueryFragment < Mysql > + QueryId ,
220
256
F : Future < Output = QueryResult < R > > + Send ,
221
257
{
258
+ self . instrumentation ( )
259
+ . on_connection_event ( InstrumentationEvent :: start_query ( & diesel:: debug_query (
260
+ & query,
261
+ ) ) ) ;
222
262
let mut bind_collector = RawBytesBindCollector :: < Mysql > :: new ( ) ;
223
263
let bind_collector = query
224
264
. collect_binds ( & mut bind_collector, & mut ( ) , & Mysql )
@@ -228,6 +268,7 @@ impl AsyncMysqlConnection {
228
268
ref mut conn,
229
269
ref mut stmt_cache,
230
270
ref mut transaction_manager,
271
+ ref mut instrumentation,
231
272
..
232
273
} = self ;
233
274
@@ -242,28 +283,37 @@ impl AsyncMysqlConnection {
242
283
} = bind_collector?;
243
284
let is_safe_to_cache_prepared = is_safe_to_cache_prepared?;
244
285
let sql = sql?;
245
- let cache_key = if let Some ( query_id) = query_id {
246
- StatementCacheKey :: Type ( query_id)
247
- } else {
248
- StatementCacheKey :: Sql {
249
- sql : sql. clone ( ) ,
250
- bind_types : metadata. clone ( ) ,
251
- }
286
+ let inner = async {
287
+ let cache_key = if let Some ( query_id) = query_id {
288
+ StatementCacheKey :: Type ( query_id)
289
+ } else {
290
+ StatementCacheKey :: Sql {
291
+ sql : sql. clone ( ) ,
292
+ bind_types : metadata. clone ( ) ,
293
+ }
294
+ } ;
295
+
296
+ let ( stmt, conn) = stmt_cache
297
+ . cached_prepared_statement (
298
+ cache_key,
299
+ sql. clone ( ) ,
300
+ is_safe_to_cache_prepared,
301
+ & metadata,
302
+ conn,
303
+ instrumentation,
304
+ )
305
+ . await ?;
306
+ callback ( conn, stmt, ToSqlHelper { metadata, binds } ) . await
252
307
} ;
253
-
254
- let ( stmt, conn) = stmt_cache
255
- . cached_prepared_statement (
256
- cache_key,
257
- sql,
258
- is_safe_to_cache_prepared,
259
- & metadata,
260
- conn,
261
- )
262
- . await ?;
263
- update_transaction_manager_status (
264
- callback ( conn, stmt, ToSqlHelper { metadata, binds } ) . await ,
265
- transaction_manager,
266
- )
308
+ let r = update_transaction_manager_status ( inner. await , transaction_manager) ;
309
+ instrumentation
310
+ . get_mut ( )
311
+ . unwrap_or_else ( |p| p. into_inner ( ) )
312
+ . on_connection_event ( InstrumentationEvent :: finish_query (
313
+ & StrQueryHelper :: new ( & sql) ,
314
+ r. as_ref ( ) . err ( ) ,
315
+ ) ) ;
316
+ r
267
317
}
268
318
. boxed ( )
269
319
}
@@ -300,6 +350,26 @@ impl AsyncMysqlConnection {
300
350
301
351
Ok ( ( ) )
302
352
}
353
+
354
+ async fn establish_connection_inner (
355
+ database_url : & str ,
356
+ ) -> Result < AsyncMysqlConnection , ConnectionError > {
357
+ let opts = Opts :: from_url ( database_url)
358
+ . map_err ( |e| diesel:: result:: ConnectionError :: InvalidConnectionUrl ( e. to_string ( ) ) ) ?;
359
+ let builder = OptsBuilder :: from_opts ( opts)
360
+ . init ( CONNECTION_SETUP_QUERIES . to_vec ( ) )
361
+ . stmt_cache_size ( 0 ) // We have our own cache
362
+ . client_found_rows ( true ) ; // This allows a consistent behavior between MariaDB/MySQL and PostgreSQL (and is already set in `diesel`)
363
+
364
+ let conn = mysql_async:: Conn :: new ( builder) . await . map_err ( ErrorHelper ) ?;
365
+
366
+ Ok ( AsyncMysqlConnection {
367
+ conn,
368
+ stmt_cache : StmtCache :: new ( ) ,
369
+ transaction_manager : AnsiTransactionManager :: default ( ) ,
370
+ instrumentation : std:: sync:: Mutex :: new ( None ) ,
371
+ } )
372
+ }
303
373
}
304
374
305
375
#[ cfg( any(
0 commit comments