Skip to content

Commit d49cb7e

Browse files
committed
Add SqliteStore implementation
We upstream our `SqliteStore` implementation that allows persistence towards an SQLite database backend.
1 parent af2bd74 commit d49cb7e

File tree

4 files changed

+288
-0
lines changed

4 files changed

+288
-0
lines changed

bench/benches/bench.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ criterion_group!(benches,
1616
lightning::sign::benches::bench_get_secure_random_bytes,
1717
lightning::ln::channelmanager::bench::bench_sends,
1818
lightning_persister::fs_store::bench::bench_sends,
19+
lightning_persister::sqlite_store::bench::bench_sends,
1920
lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file,
2021
lightning::routing::gossip::benches::read_network_graph,
2122
lightning::routing::gossip::benches::write_network_graph);

lightning-persister/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1616
[dependencies]
1717
bitcoin = "0.29.0"
1818
lightning = { version = "0.0.116", path = "../lightning" }
19+
rusqlite = { version = "0.28.0", features = ["bundled"] }
1920

2021
[target.'cfg(windows)'.dependencies]
2122
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }

lightning-persister/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#[cfg(ldk_bench)] extern crate criterion;
1212

1313
pub mod fs_store;
14+
pub mod sqlite_store;
1415

1516
mod utils;
1617

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
//! Objects related to [`SqliteStore`] live here.
2+
use lightning::util::persist::KVStore;
3+
use lightning::util::string::PrintableString;
4+
5+
use rusqlite::{named_params, Connection};
6+
7+
use std::fs;
8+
use std::path::PathBuf;
9+
use std::sync::{Arc, Mutex};
10+
11+
/// The default database file name.
12+
pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
13+
14+
/// The default table in which we store all data.
15+
pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
16+
17+
// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
18+
const SCHEMA_USER_VERSION: u16 = 1;
19+
20+
/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
21+
///
22+
/// [SQLite]: https://sqlite.org
23+
pub struct SqliteStore {
24+
connection: Arc<Mutex<Connection>>,
25+
data_dir: PathBuf,
26+
kv_table_name: String,
27+
}
28+
29+
impl SqliteStore {
30+
/// Constructs a new [`SqliteStore`].
31+
///
32+
/// If not already existing, a new SQLite database will be created in the given `data_dir` under the
33+
/// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`).
34+
///
35+
/// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`].
36+
pub fn new(data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>) -> Self {
37+
let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
38+
let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
39+
40+
fs::create_dir_all(data_dir.clone()).unwrap_or_else(|_| {
41+
panic!("Failed to create database destination directory: {}", data_dir.display())
42+
});
43+
let mut db_file_path = data_dir.clone();
44+
db_file_path.push(db_file_name);
45+
46+
let connection = Connection::open(db_file_path.clone()).unwrap_or_else(|_| {
47+
panic!("Failed to open/create database file: {}", db_file_path.display())
48+
});
49+
50+
connection
51+
.pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| {
52+
Ok(())
53+
})
54+
.unwrap_or_else(|_| panic!("Failed to set PRAGMA user_version"));
55+
56+
let sql = format!(
57+
"CREATE TABLE IF NOT EXISTS {} (
58+
namespace TEXT NOT NULL,
59+
key TEXT NOT NULL CHECK (key <> ''),
60+
value BLOB, PRIMARY KEY ( namespace, key )
61+
);",
62+
kv_table_name
63+
);
64+
connection
65+
.execute(&sql, [])
66+
.unwrap_or_else(|_| panic!("Failed to create table: {}", kv_table_name));
67+
68+
let connection = Arc::new(Mutex::new(connection));
69+
Self { connection, data_dir, kv_table_name }
70+
}
71+
72+
/// Returns the data directory.
73+
pub fn get_data_dir(&self) -> PathBuf {
74+
self.data_dir.clone()
75+
}
76+
}
77+
78+
impl KVStore for SqliteStore {
79+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
80+
if key.is_empty() {
81+
debug_assert!(false, "Failed to read {}/{}: key may not be empty.",
82+
PrintableString(namespace), PrintableString(key));
83+
let msg = format!("Failed to read {}/{}: key may not be empty.",
84+
PrintableString(namespace), PrintableString(key));
85+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
86+
}
87+
88+
if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
89+
key.chars().any(|c| !c.is_ascii() || c.is_control()) {
90+
debug_assert!(false, "Failed to read {}/{}: namespace and key must be valid ASCII
91+
strings.", PrintableString(namespace), PrintableString(key));
92+
let msg = format!("Failed to read {}/{}: namespace and key must be valid ASCII strings.",
93+
PrintableString(namespace), PrintableString(key));
94+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
95+
}
96+
97+
let locked_conn = self.connection.lock().unwrap();
98+
let sql =
99+
format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", self.kv_table_name);
100+
101+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
102+
let msg = format!("Failed to prepare statement: {}", e);
103+
std::io::Error::new(std::io::ErrorKind::Other, msg)
104+
})?;
105+
106+
let res = stmt
107+
.query_row(
108+
named_params! {
109+
":namespace": namespace,
110+
":key": key,
111+
},
112+
|row| row.get(0),
113+
)
114+
.map_err(|e| match e {
115+
rusqlite::Error::QueryReturnedNoRows => {
116+
let msg =
117+
format!("Failed to read as key could not be found: {}/{}", namespace, key);
118+
std::io::Error::new(std::io::ErrorKind::NotFound, msg)
119+
}
120+
e => {
121+
let msg = format!("Failed to read from key {}/{}: {}", namespace, key, e);
122+
std::io::Error::new(std::io::ErrorKind::Other, msg)
123+
}
124+
})?;
125+
Ok(res)
126+
}
127+
128+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
129+
if key.is_empty() {
130+
debug_assert!(false, "Failed to write {}/{}: key may not be empty.",
131+
PrintableString(namespace), PrintableString(key));
132+
let msg = format!("Failed to write {}/{}: key may not be empty.",
133+
PrintableString(namespace), PrintableString(key));
134+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
135+
}
136+
137+
if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
138+
key.chars().any(|c| !c.is_ascii() || c.is_control()) {
139+
debug_assert!(false, "Failed to write {}/{}: namespace and key must be valid ASCII
140+
strings.", PrintableString(namespace), PrintableString(key));
141+
let msg = format!("Failed to write {}/{}: namespace and key must be valid ASCII strings.",
142+
PrintableString(namespace), PrintableString(key));
143+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
144+
}
145+
146+
let locked_conn = self.connection.lock().unwrap();
147+
148+
let sql = format!(
149+
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
150+
self.kv_table_name
151+
);
152+
153+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
154+
let msg = format!("Failed to prepare statement: {}", e);
155+
std::io::Error::new(std::io::ErrorKind::Other, msg)
156+
})?;
157+
158+
stmt.execute(
159+
named_params! {
160+
":namespace": namespace,
161+
":key": key,
162+
":value": buf,
163+
},
164+
)
165+
.map(|_| ())
166+
.map_err(|e| {
167+
let msg = format!("Failed to write to key {}/{}: {}", namespace, key, e);
168+
std::io::Error::new(std::io::ErrorKind::Other, msg)
169+
})
170+
}
171+
172+
fn remove(&self, namespace: &str, key: &str) -> lightning::io::Result<()> {
173+
if key.is_empty() {
174+
debug_assert!(false, "Failed to remove {}/{}: key may not be empty.",
175+
PrintableString(namespace), PrintableString(key));
176+
let msg = format!("Failed to remove {}/{}: key may not be empty.",
177+
PrintableString(namespace), PrintableString(key));
178+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
179+
}
180+
181+
if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
182+
key.chars().any(|c| !c.is_ascii() || c.is_control()) {
183+
debug_assert!(false, "Failed to remove {}/{}: namespace and key must be valid ASCII
184+
strings.", PrintableString(namespace), PrintableString(key));
185+
let msg = format!("Failed to remove {}/{}: namespace and key must be valid ASCII strings.",
186+
PrintableString(namespace), PrintableString(key));
187+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
188+
}
189+
190+
let locked_conn = self.connection.lock().unwrap();
191+
192+
let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND key=:key;", self.kv_table_name);
193+
194+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
195+
let msg = format!("Failed to prepare statement: {}", e);
196+
std::io::Error::new(std::io::ErrorKind::Other, msg)
197+
})?;
198+
199+
stmt.execute(
200+
named_params! {
201+
":namespace": namespace,
202+
":key": key,
203+
},
204+
)
205+
.map_err(|e| {
206+
let msg = format!("Failed to delete key {}/{}: {}", namespace, key, e);
207+
std::io::Error::new(std::io::ErrorKind::Other, msg)
208+
})?;
209+
Ok(())
210+
}
211+
212+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
213+
let locked_conn = self.connection.lock().unwrap();
214+
215+
let sql = format!("SELECT key FROM {} WHERE namespace=:namespace", self.kv_table_name);
216+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
217+
let msg = format!("Failed to prepare statement: {}", e);
218+
std::io::Error::new(std::io::ErrorKind::Other, msg)
219+
})?;
220+
221+
let mut keys = Vec::new();
222+
223+
let rows_iter = stmt
224+
.query_map(named_params! {":namespace": namespace, }, |row| row.get(0))
225+
.map_err(|e| {
226+
let msg = format!("Failed to retrieve queried rows: {}", e);
227+
std::io::Error::new(std::io::ErrorKind::Other, msg)
228+
})?;
229+
230+
for k in rows_iter {
231+
keys.push(k.map_err(|e| {
232+
let msg = format!("Failed to retrieve queried rows: {}", e);
233+
std::io::Error::new(std::io::ErrorKind::Other, msg)
234+
})?);
235+
}
236+
237+
Ok(keys)
238+
}
239+
}
240+
241+
#[cfg(test)]
242+
mod tests {
243+
use super::*;
244+
use crate::test_utils::{do_read_write_remove_list_persist,do_test_store};
245+
246+
impl Drop for SqliteStore {
247+
fn drop(&mut self) {
248+
match fs::remove_dir_all(&self.data_dir) {
249+
Err(e) => println!("Failed to remove test store directory: {}", e),
250+
_ => {}
251+
}
252+
}
253+
}
254+
255+
#[test]
256+
fn read_write_remove_list_persist() {
257+
let mut temp_path = std::env::temp_dir();
258+
temp_path.push("read_write_remove_list_persist");
259+
let store = SqliteStore::new(temp_path, Some("test_db".to_string()), Some("test_table".to_string()));
260+
do_read_write_remove_list_persist(&store);
261+
}
262+
263+
#[test]
264+
fn test_sqlite_store() {
265+
let mut temp_path = std::env::temp_dir();
266+
temp_path.push("test_sqlite_store");
267+
let store_0 = SqliteStore::new(temp_path.clone(), Some("test_db_0".to_string()), Some("test_table".to_string()));
268+
let store_1 = SqliteStore::new(temp_path, Some("test_db_1".to_string()), Some("test_table".to_string()));
269+
do_test_store(&store_0, &store_1)
270+
}
271+
}
272+
273+
#[cfg(ldk_bench)]
274+
/// Benches
275+
pub mod bench {
276+
use criterion::Criterion;
277+
278+
/// Bench!
279+
pub fn bench_sends(bench: &mut Criterion) {
280+
let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None);
281+
let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None);
282+
lightning::ln::channelmanager::bench::bench_two_sends(
283+
bench, "bench_sqlite_persisted_sends", store_a, store_b);
284+
}
285+
}

0 commit comments

Comments
 (0)