Skip to content

Commit 444d45d

Browse files
authored
Merge pull request #160 from WattSense/sync_wrapper_clear_mutex_poison
sync wrapper: Clear mutex poison and provide underlying mutex guar
2 parents b85a943 + a5342a9 commit 444d45d

File tree

3 files changed

+74
-16
lines changed

3 files changed

+74
-16
lines changed

examples/sync-wrapper/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
diesel = { version = "2.1.0", default-features = false }
9+
diesel = { version = "2.1.0", default-features = false, features = ["returning_clauses_for_sqlite_3_35"] }
1010
diesel-async = { version = "0.4.0", path = "../../", features = ["sync-connection-wrapper", "async-connection-wrapper"] }
1111
diesel_migrations = "2.1.0"
1212
futures-util = "0.3.21"

examples/sync-wrapper/src/main.rs

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use diesel::prelude::*;
22
use diesel::sqlite::{Sqlite, SqliteConnection};
33
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
44
use diesel_async::sync_connection_wrapper::SyncConnectionWrapper;
5-
use diesel_async::{AsyncConnection, RunQueryDsl, SimpleAsyncConnection};
5+
use diesel_async::{AsyncConnection, RunQueryDsl};
66
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
77

88
// ordinary diesel model setup
@@ -15,7 +15,7 @@ table! {
1515
}
1616

1717
#[allow(dead_code)]
18-
#[derive(Debug, Queryable, Selectable)]
18+
#[derive(Debug, Queryable, QueryableByName, Selectable)]
1919
#[diesel(table_name = users)]
2020
struct User {
2121
id: i32,
@@ -47,6 +47,27 @@ where
4747
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
4848
}
4949

50+
async fn transaction(
51+
async_conn: &mut SyncConnectionWrapper<InnerConnection>,
52+
old_name: &str,
53+
new_name: &str,
54+
) -> Result<Vec<User>, diesel::result::Error> {
55+
async_conn
56+
.transaction::<Vec<User>, diesel::result::Error, _>(|c| {
57+
Box::pin(async {
58+
if old_name.is_empty() {
59+
Ok(Vec::new())
60+
} else {
61+
diesel::update(users::table.filter(users::name.eq(old_name)))
62+
.set(users::name.eq(new_name))
63+
.load(c)
64+
.await
65+
}
66+
})
67+
})
68+
.await
69+
}
70+
5071
#[tokio::main]
5172
async fn main() -> Result<(), Box<dyn std::error::Error>> {
5273
let db_url = std::env::var("DATABASE_URL").expect("Env var `DATABASE_URL` not set");
@@ -57,10 +78,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5778

5879
let mut sync_wrapper: SyncConnectionWrapper<InnerConnection> = establish(&db_url).await?;
5980

60-
sync_wrapper.batch_execute("DELETE FROM users").await?;
81+
diesel::delete(users::table)
82+
.execute(&mut sync_wrapper)
83+
.await?;
6184

62-
sync_wrapper
63-
.batch_execute("INSERT INTO users(id, name) VALUES (3, 'toto')")
85+
diesel::insert_into(users::table)
86+
.values((users::id.eq(3), users::name.eq("toto")))
87+
.execute(&mut sync_wrapper)
6488
.await?;
6589

6690
let data: Vec<User> = users::table
@@ -86,5 +110,28 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
86110
.await?;
87111
println!("{data:?}");
88112

113+
// a quick test to check if we correctly handle transactions
114+
let mut conn_a: SyncConnectionWrapper<InnerConnection> = establish(&db_url).await?;
115+
let mut conn_b: SyncConnectionWrapper<InnerConnection> = establish(&db_url).await?;
116+
117+
let handle_1 = tokio::spawn(async move {
118+
loop {
119+
let changed = transaction(&mut conn_a, "iLuke", "JustLuke").await;
120+
println!("Changed {changed:?}");
121+
std::thread::sleep(std::time::Duration::from_secs(1));
122+
}
123+
});
124+
125+
let handle_2 = tokio::spawn(async move {
126+
loop {
127+
let changed = transaction(&mut conn_b, "JustLuke", "iLuke").await;
128+
println!("Changed {changed:?}");
129+
std::thread::sleep(std::time::Duration::from_secs(1));
130+
}
131+
});
132+
133+
let _ = handle_2.await;
134+
let _ = handle_1.await;
135+
89136
Ok(())
90137
}

src/sync_connection_wrapper.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,17 @@ where
129129
let mut cache = <<<C as LoadConnection>::Row<'_, '_> as IntoOwnedRow<
130130
<C as Connection>::Backend,
131131
>>::Cache as Default>::default();
132-
conn.load(&query).map(|c| {
133-
c.map(|row| row.map(|r| IntoOwnedRow::into_owned(r, &mut cache)))
134-
.collect::<Vec<QueryResult<O>>>()
135-
})
132+
let cursor = conn.load(&query)?;
133+
134+
let size_hint = cursor.size_hint();
135+
let mut out = Vec::with_capacity(size_hint.1.unwrap_or(size_hint.0));
136+
// we use an explicit loop here to easily propagate possible errors
137+
// as early as possible
138+
for row in cursor {
139+
out.push(Ok(IntoOwnedRow::into_owned(row?, &mut cache)));
140+
}
141+
142+
Ok(out)
136143
})
137144
.map_ok(|rows| futures_util::stream::iter(rows).boxed())
138145
.boxed()
@@ -235,9 +242,11 @@ impl<C> SyncConnectionWrapper<C> {
235242
{
236243
let inner = self.inner.clone();
237244
tokio::task::spawn_blocking(move || {
238-
let mut inner = inner
239-
.lock()
240-
.expect("Mutex is poisoned, a thread must have panicked holding it.");
245+
let mut inner = inner.lock().unwrap_or_else(|poison| {
246+
// try to be resilient by providing the guard
247+
inner.clear_poison();
248+
poison.into_inner()
249+
});
241250
task(&mut inner)
242251
})
243252
.unwrap_or_else(|err| QueryResult::Err(from_tokio_join_error(err)))
@@ -268,9 +277,11 @@ impl<C> SyncConnectionWrapper<C> {
268277

269278
let (collect_bind_result, collector_data) = {
270279
let exclusive = self.inner.clone();
271-
let mut inner = exclusive
272-
.lock()
273-
.expect("Mutex is poisoned, a thread must have panicked holding it.");
280+
let mut inner = exclusive.lock().unwrap_or_else(|poison| {
281+
// try to be resilient by providing the guard
282+
exclusive.clear_poison();
283+
poison.into_inner()
284+
});
274285
let mut bind_collector =
275286
<<C::Backend as Backend>::BindCollector<'_> as Default>::default();
276287
let metadata_lookup = inner.metadata_lookup();

0 commit comments

Comments
 (0)