@@ -156,18 +156,10 @@ impl AsyncConnection for AsyncPgConnection {
156
156
T : AsQuery + ' query ,
157
157
T :: Query : QueryFragment < Self :: Backend > + QueryId + ' query ,
158
158
{
159
- let connection_future = self . connection_future . as_ref ( ) . map ( |rx| rx. resubscribe ( ) ) ;
160
159
let query = source. as_query ( ) ;
161
- let load_future = self . with_prepared_statement ( query, |conn, stmt, binds| async move {
162
- let res = conn. query_raw ( & stmt, binds) . await . map_err ( ErrorHelper ) ?;
163
-
164
- Ok ( res
165
- . map_err ( |e| diesel:: result:: Error :: from ( ErrorHelper ( e) ) )
166
- . map_ok ( PgRow :: new)
167
- . boxed ( ) )
168
- } ) ;
160
+ let load_future = self . with_prepared_statement ( query, load_prepared) ;
169
161
170
- drive_future ( connection_future , load_future ) . boxed ( )
162
+ self . run_with_connection_future ( load_future )
171
163
}
172
164
173
165
fn execute_returning_count < ' conn , ' query , T > (
@@ -177,19 +169,8 @@ impl AsyncConnection for AsyncPgConnection {
177
169
where
178
170
T : QueryFragment < Self :: Backend > + QueryId + ' query ,
179
171
{
180
- let connection_future = self . connection_future . as_ref ( ) . map ( |rx| rx. resubscribe ( ) ) ;
181
- let execute = self . with_prepared_statement ( source, |conn, stmt, binds| async move {
182
- let binds = binds
183
- . iter ( )
184
- . map ( |b| b as & ( dyn ToSql + Sync ) )
185
- . collect :: < Vec < _ > > ( ) ;
186
-
187
- let res = tokio_postgres:: Client :: execute ( & conn, & stmt, & binds as & [ _ ] )
188
- . await
189
- . map_err ( ErrorHelper ) ?;
190
- Ok ( res as usize )
191
- } ) ;
192
- drive_future ( connection_future, execute) . boxed ( )
172
+ let execute = self . with_prepared_statement ( source, execute_prepared) ;
173
+ self . run_with_connection_future ( execute)
193
174
}
194
175
195
176
fn transaction_state ( & mut self ) -> & mut AnsiTransactionManager {
@@ -212,6 +193,35 @@ impl Drop for AsyncPgConnection {
212
193
}
213
194
}
214
195
196
+ async fn load_prepared (
197
+ conn : Arc < tokio_postgres:: Client > ,
198
+ stmt : Statement ,
199
+ binds : Vec < ToSqlHelper > ,
200
+ ) -> QueryResult < BoxStream < ' static , QueryResult < PgRow > > > {
201
+ let res = conn. query_raw ( & stmt, binds) . await . map_err ( ErrorHelper ) ?;
202
+
203
+ Ok ( res
204
+ . map_err ( |e| diesel:: result:: Error :: from ( ErrorHelper ( e) ) )
205
+ . map_ok ( PgRow :: new)
206
+ . boxed ( ) )
207
+ }
208
+
209
+ async fn execute_prepared (
210
+ conn : Arc < tokio_postgres:: Client > ,
211
+ stmt : Statement ,
212
+ binds : Vec < ToSqlHelper > ,
213
+ ) -> QueryResult < usize > {
214
+ let binds = binds
215
+ . iter ( )
216
+ . map ( |b| b as & ( dyn ToSql + Sync ) )
217
+ . collect :: < Vec < _ > > ( ) ;
218
+
219
+ let res = tokio_postgres:: Client :: execute ( & conn, & stmt, & binds as & [ _ ] )
220
+ . await
221
+ . map_err ( ErrorHelper ) ?;
222
+ Ok ( res as usize )
223
+ }
224
+
215
225
#[ inline( always) ]
216
226
fn update_transaction_manager_status < T > (
217
227
query_result : QueryResult < T > ,
@@ -335,14 +345,22 @@ impl AsyncPgConnection {
335
345
Ok ( ( ) )
336
346
}
337
347
348
+ fn run_with_connection_future < ' a , R : ' a > (
349
+ & self ,
350
+ future : impl Future < Output = QueryResult < R > > + Send + ' a ,
351
+ ) -> BoxFuture < ' a , QueryResult < R > > {
352
+ let connection_future = self . connection_future . as_ref ( ) . map ( |rx| rx. resubscribe ( ) ) ;
353
+ drive_future ( connection_future, future) . boxed ( )
354
+ }
355
+
338
356
fn with_prepared_statement < ' a , T , F , R > (
339
357
& mut self ,
340
358
query : T ,
341
- callback : impl FnOnce ( Arc < tokio_postgres:: Client > , Statement , Vec < ToSqlHelper > ) -> F + Send + ' a ,
359
+ callback : fn ( Arc < tokio_postgres:: Client > , Statement , Vec < ToSqlHelper > ) -> F ,
342
360
) -> BoxFuture < ' a , QueryResult < R > >
343
361
where
344
362
T : QueryFragment < diesel:: pg:: Pg > + QueryId ,
345
- F : Future < Output = QueryResult < R > > + Send ,
363
+ F : Future < Output = QueryResult < R > > + Send + ' a ,
346
364
R : Send ,
347
365
{
348
366
// we explicilty descruct the query here before going into the async block
@@ -352,14 +370,9 @@ impl AsyncPgConnection {
352
370
// which both are `Send`.
353
371
// We also collect the query id (essentially an integer) and the safe_to_cache flag here
354
372
// so there is no need to even access the query in the async block below
355
- let is_safe_to_cache_prepared = query. is_safe_to_cache_prepared ( & diesel:: pg:: Pg ) ;
356
373
let mut query_builder = PgQueryBuilder :: default ( ) ;
357
- let sql = query
358
- . to_sql ( & mut query_builder, & Pg )
359
- . map ( |_| query_builder. finish ( ) ) ;
360
374
361
375
let mut bind_collector = RawBytesBindCollector :: < diesel:: pg:: Pg > :: new ( ) ;
362
- let query_id = T :: query_id ( ) ;
363
376
364
377
// we don't resolve custom types here yet, we do that later
365
378
// in the async block below as we might need to perform lookup
@@ -368,16 +381,42 @@ impl AsyncPgConnection {
368
381
// We apply this workaround to prevent requiring all the diesel
369
382
// serialization code to beeing async
370
383
let mut metadata_lookup = PgAsyncMetadataLookup :: new ( ) ;
371
- let collect_bind_result =
372
- query. collect_binds ( & mut bind_collector, & mut metadata_lookup, & Pg ) ;
373
384
385
+ // The code that doesn't need the `T` generic parameter is in a separate function to reduce LLVM IR lines
386
+ self . with_prepared_statement_after_sql_built (
387
+ callback,
388
+ query. is_safe_to_cache_prepared ( & Pg ) ,
389
+ T :: query_id ( ) ,
390
+ query. to_sql ( & mut query_builder, & Pg ) ,
391
+ query. collect_binds ( & mut bind_collector, & mut metadata_lookup, & Pg ) ,
392
+ query_builder,
393
+ bind_collector,
394
+ metadata_lookup,
395
+ )
396
+ }
397
+
398
+ fn with_prepared_statement_after_sql_built < ' a , F , R > (
399
+ & mut self ,
400
+ callback : fn ( Arc < tokio_postgres:: Client > , Statement , Vec < ToSqlHelper > ) -> F ,
401
+ is_safe_to_cache_prepared : QueryResult < bool > ,
402
+ query_id : Option < std:: any:: TypeId > ,
403
+ to_sql_result : QueryResult < ( ) > ,
404
+ collect_bind_result : QueryResult < ( ) > ,
405
+ query_builder : PgQueryBuilder ,
406
+ mut bind_collector : RawBytesBindCollector < Pg > ,
407
+ metadata_lookup : PgAsyncMetadataLookup ,
408
+ ) -> BoxFuture < ' a , QueryResult < R > >
409
+ where
410
+ F : Future < Output = QueryResult < R > > + Send + ' a ,
411
+ R : Send ,
412
+ {
374
413
let raw_connection = self . conn . clone ( ) ;
375
414
let stmt_cache = self . stmt_cache . clone ( ) ;
376
415
let metadata_cache = self . metadata_cache . clone ( ) ;
377
416
let tm = self . transaction_state . clone ( ) ;
378
417
379
418
async move {
380
- let sql = sql ?;
419
+ let sql = to_sql_result . map ( |_| query_builder . finish ( ) ) ?;
381
420
let is_safe_to_cache_prepared = is_safe_to_cache_prepared?;
382
421
collect_bind_result?;
383
422
// Check whether we need to resolve some types at all
0 commit comments