Skip to content

Commit 6123531

Browse files
committed
Add SqliteStore implementation
We upstream our `SqliteStore` implementation that allows persistence towards an SQLite database backend.
1 parent 1798097 commit 6123531

File tree

4 files changed

+277
-0
lines changed

4 files changed

+277
-0
lines changed

bench/benches/bench.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ criterion_group!(benches,
1616
lightning::sign::benches::bench_get_secure_random_bytes,
1717
lightning::ln::channelmanager::bench::bench_sends,
1818
lightning_persister::fs_store::bench::bench_sends,
19+
lightning_persister::sqlite_store::bench::bench_sends,
1920
lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file,
2021
lightning::routing::gossip::benches::read_network_graph,
2122
lightning::routing::gossip::benches::write_network_graph);

lightning-persister/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1616
[dependencies]
1717
bitcoin = "0.29.0"
1818
lightning = { version = "0.0.116", path = "../lightning" }
19+
rusqlite = { version = "0.28.0", features = ["bundled"] }
1920

2021
[target.'cfg(windows)'.dependencies]
2122
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }

lightning-persister/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#[cfg(ldk_bench)] extern crate criterion;
1212

1313
pub mod fs_store;
14+
pub mod sqlite_store;
1415

1516
#[cfg(test)]
1617
mod test_utils;
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
//! Objects related to [`SqliteStore`] live here.
2+
use lightning::util::persist::KVStore;
3+
use lightning::util::string::PrintableString;
4+
5+
use rusqlite::{named_params, Connection};
6+
7+
use std::fs;
8+
use std::path::PathBuf;
9+
use std::sync::{Arc, Mutex};
10+
11+
/// The default database file name.
12+
pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
13+
14+
/// The default table in which we store all data.
15+
pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
16+
17+
// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
18+
const SCHEMA_USER_VERSION: u16 = 1;
19+
20+
/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
21+
///
22+
/// [SQLite]: https://sqlite.org
23+
pub struct SqliteStore {
24+
connection: Arc<Mutex<Connection>>,
25+
data_dir: PathBuf,
26+
kv_table_name: String,
27+
}
28+
29+
impl SqliteStore {
30+
/// Constructs a new [`SqliteStore`].
31+
///
32+
/// If not already existing, a new SQLite database will be created in the given `data_dir` under the
33+
/// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`).
34+
///
35+
/// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`].
36+
pub fn new(data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>) -> Self {
37+
let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
38+
let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
39+
40+
fs::create_dir_all(data_dir.clone()).unwrap_or_else(|_| {
41+
panic!("Failed to create database destination directory: {}", data_dir.display())
42+
});
43+
let mut db_file_path = data_dir.clone();
44+
db_file_path.push(db_file_name);
45+
46+
let connection = Connection::open(db_file_path.clone()).unwrap_or_else(|_| {
47+
panic!("Failed to open/create database file: {}", db_file_path.display())
48+
});
49+
50+
connection
51+
.pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| {
52+
Ok(())
53+
})
54+
.unwrap_or_else(|_| panic!("Failed to set PRAGMA user_version"));
55+
56+
let sql = format!(
57+
"CREATE TABLE IF NOT EXISTS {} (
58+
namespace TEXT NOT NULL,
59+
key TEXT NOT NULL CHECK (key <> ''),
60+
value BLOB, PRIMARY KEY ( namespace, key )
61+
);",
62+
kv_table_name
63+
);
64+
connection
65+
.execute(&sql, [])
66+
.unwrap_or_else(|_| panic!("Failed to create table: {}", kv_table_name));
67+
68+
let connection = Arc::new(Mutex::new(connection));
69+
Self { connection, data_dir, kv_table_name }
70+
}
71+
72+
/// Returns the data directory.
73+
pub fn get_data_dir(&self) -> PathBuf {
74+
self.data_dir.clone()
75+
}
76+
}
77+
78+
impl KVStore for SqliteStore {
79+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
80+
if key.is_empty() {
81+
debug_assert!(false, "Failed to read {}/{}: key may not be empty.",
82+
PrintableString(namespace), PrintableString(key));
83+
let msg = format!("Failed to read {}/{}: key may not be empty.",
84+
PrintableString(namespace), PrintableString(key));
85+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
86+
}
87+
88+
if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
89+
key.chars().any(|c| !c.is_ascii() || c.is_control()) {
90+
debug_assert!(false, "Failed to read {}/{}: namespace and key must be valid ASCII
91+
strings.", PrintableString(namespace), PrintableString(key));
92+
let msg = format!("Failed to read {}/{}: namespace and key must be valid ASCII strings.",
93+
PrintableString(namespace), PrintableString(key));
94+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
95+
}
96+
97+
let locked_conn = self.connection.lock().unwrap();
98+
let sql =
99+
format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", self.kv_table_name);
100+
101+
let res = locked_conn
102+
.query_row(
103+
&sql,
104+
named_params! {
105+
":namespace": namespace,
106+
":key": key,
107+
},
108+
|row| row.get(0),
109+
)
110+
.map_err(|e| match e {
111+
rusqlite::Error::QueryReturnedNoRows => {
112+
let msg =
113+
format!("Failed to read as key could not be found: {}/{}", namespace, key);
114+
std::io::Error::new(std::io::ErrorKind::NotFound, msg)
115+
}
116+
e => {
117+
let msg = format!("Failed to read from key {}/{}: {}", namespace, key, e);
118+
std::io::Error::new(std::io::ErrorKind::Other, msg)
119+
}
120+
})?;
121+
Ok(res)
122+
}
123+
124+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
125+
if key.is_empty() {
126+
debug_assert!(false, "Failed to write {}/{}: key may not be empty.",
127+
PrintableString(namespace), PrintableString(key));
128+
let msg = format!("Failed to write {}/{}: key may not be empty.",
129+
PrintableString(namespace), PrintableString(key));
130+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
131+
}
132+
133+
if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
134+
key.chars().any(|c| !c.is_ascii() || c.is_control()) {
135+
debug_assert!(false, "Failed to write {}/{}: namespace and key must be valid ASCII
136+
strings.", PrintableString(namespace), PrintableString(key));
137+
let msg = format!("Failed to write {}/{}: namespace and key must be valid ASCII strings.",
138+
PrintableString(namespace), PrintableString(key));
139+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
140+
}
141+
142+
let locked_conn = self.connection.lock().unwrap();
143+
144+
let sql = format!(
145+
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
146+
self.kv_table_name
147+
);
148+
149+
locked_conn
150+
.execute(
151+
&sql,
152+
named_params! {
153+
":namespace": namespace,
154+
":key": key,
155+
":value": buf,
156+
},
157+
)
158+
.map(|_| ())
159+
.map_err(|e| {
160+
let msg = format!("Failed to write to key {}/{}: {}", namespace, key, e);
161+
std::io::Error::new(std::io::ErrorKind::Other, msg)
162+
})
163+
}
164+
165+
fn remove(&self, namespace: &str, key: &str) -> lightning::io::Result<()> {
166+
if key.is_empty() {
167+
debug_assert!(false, "Failed to remove {}/{}: key may not be empty.",
168+
PrintableString(namespace), PrintableString(key));
169+
let msg = format!("Failed to remove {}/{}: key may not be empty.",
170+
PrintableString(namespace), PrintableString(key));
171+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
172+
}
173+
174+
if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
175+
key.chars().any(|c| !c.is_ascii() || c.is_control()) {
176+
debug_assert!(false, "Failed to remove {}/{}: namespace and key must be valid ASCII
177+
strings.", PrintableString(namespace), PrintableString(key));
178+
let msg = format!("Failed to remove {}/{}: namespace and key must be valid ASCII strings.",
179+
PrintableString(namespace), PrintableString(key));
180+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
181+
}
182+
183+
let locked_conn = self.connection.lock().unwrap();
184+
185+
let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND key=:key;", self.kv_table_name);
186+
locked_conn
187+
.execute(
188+
&sql,
189+
named_params! {
190+
":namespace": namespace,
191+
":key": key,
192+
},
193+
)
194+
.map_err(|e| {
195+
let msg = format!("Failed to delete key {}/{}: {}", namespace, key, e);
196+
std::io::Error::new(std::io::ErrorKind::Other, msg)
197+
})?;
198+
Ok(())
199+
}
200+
201+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
202+
let locked_conn = self.connection.lock().unwrap();
203+
204+
let sql = format!("SELECT key FROM {} WHERE namespace=:namespace", self.kv_table_name);
205+
let mut stmt = locked_conn.prepare(&sql).map_err(|e| {
206+
let msg = format!("Failed to prepare statement: {}", e);
207+
std::io::Error::new(std::io::ErrorKind::Other, msg)
208+
})?;
209+
210+
let mut keys = Vec::new();
211+
212+
let rows_iter = stmt
213+
.query_map(named_params! {":namespace": namespace, }, |row| row.get(0))
214+
.map_err(|e| {
215+
let msg = format!("Failed to retrieve queried rows: {}", e);
216+
std::io::Error::new(std::io::ErrorKind::Other, msg)
217+
})?;
218+
219+
for k in rows_iter {
220+
keys.push(k.map_err(|e| {
221+
let msg = format!("Failed to retrieve queried rows: {}", e);
222+
std::io::Error::new(std::io::ErrorKind::Other, msg)
223+
})?);
224+
}
225+
226+
Ok(keys)
227+
}
228+
}
229+
230+
#[cfg(test)]
231+
mod tests {
232+
use super::*;
233+
use crate::test_utils::{do_read_write_remove_list_persist,do_test_store};
234+
235+
impl Drop for SqliteStore {
236+
fn drop(&mut self) {
237+
match fs::remove_dir_all(&self.data_dir) {
238+
Err(e) => println!("Failed to remove test store directory: {}", e),
239+
_ => {}
240+
}
241+
}
242+
}
243+
244+
#[test]
245+
fn read_write_remove_list_persist() {
246+
let mut temp_path = std::env::temp_dir();
247+
temp_path.push("read_write_remove_list_persist");
248+
let store = SqliteStore::new(temp_path, Some("test_db".to_string()), Some("test_table".to_string()));
249+
do_read_write_remove_list_persist(&store);
250+
}
251+
252+
#[test]
253+
fn test_sqlite_store() {
254+
let mut temp_path = std::env::temp_dir();
255+
temp_path.push("test_sqlite_store");
256+
let store_0 = SqliteStore::new(temp_path.clone(), Some("test_db_0".to_string()), Some("test_table".to_string()));
257+
let store_1 = SqliteStore::new(temp_path, Some("test_db_1".to_string()), Some("test_table".to_string()));
258+
do_test_store(&store_0, &store_1)
259+
}
260+
}
261+
262+
#[cfg(ldk_bench)]
263+
/// Benches
264+
pub mod bench {
265+
use criterion::Criterion;
266+
267+
/// Bench!
268+
pub fn bench_sends(bench: &mut Criterion) {
269+
let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None);
270+
let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None);
271+
lightning::ln::channelmanager::bench::bench_two_sends(
272+
bench, "bench_sqlite_persisted_sends", store_a, store_b);
273+
}
274+
}

0 commit comments

Comments
 (0)