Skip to content

Commit e72996f

Browse files
Add BackgroundProcessor for ChannelManager persistence and other
Other includes calling timer_chan_freshness_every_minute() and in the future, possibly persisting channel graph data. This struct is suitable for things that need to happen periodically and can happen in the background.
1 parent 8b9bd7f commit e72996f

File tree

5 files changed

+400
-3
lines changed

5 files changed

+400
-3
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ members = [
55
"lightning-block-sync",
66
"lightning-net-tokio",
77
"lightning-persister",
8+
"background-processor",
89
]
910

1011
# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it.

background-processor/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "background-processor"
3+
version = "0.1.0"
4+
authors = ["Valentine Wallace <[email protected]>"]
5+
edition = "2018"
6+
7+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8+
9+
[dependencies]
10+
bitcoin = "0.24"
11+
lightning = { version = "0.0.12", path = "../lightning", features = ["allow_wallclock_use"] }
12+
lightning-persister = { version = "0.0.1", path = "../lightning-persister" }
13+
14+
[dev-dependencies]
15+
lightning = { version = "0.0.12", path = "../lightning", features = ["_test_utils"] }

background-processor/src/lib.rs

Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
use lightning::chain;
2+
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3+
use lightning::chain::keysinterface::{ChannelKeys, KeysInterface};
4+
use lightning::ln::channelmanager::ChannelManager;
5+
use lightning::{log_internal, log_trace};
6+
use lightning::util::logger::Logger;
7+
use lightning::util::ser::Writeable;
8+
use std::sync::Arc;
9+
use std::sync::atomic::{AtomicBool, Ordering};
10+
use std::thread;
11+
use std::thread::JoinHandle;
12+
use std::time::{Duration, Instant};
13+
14+
/// BackgroundProcessor takes care of tasks that (1) need to happen periodically to keep
15+
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
16+
/// responsibilities are:
17+
/// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so,
18+
/// writing it to disk/backups by invoking the callback given to it at startup.
19+
/// ChannelManager persistence should be done in the background.
20+
/// * Calling `ChannelManager::timer_chan_freshness_every_min()` every minute (can be done in the
21+
/// background).
22+
///
23+
/// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date,
24+
/// then there is a risk of channels force-closing on startup when the manager realizes it's
25+
/// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used
26+
/// for unilateral chain closure fees are at risk.
27+
pub struct BackgroundProcessor {
28+
stop_thread: Arc<AtomicBool>,
29+
/// May be used to retrieve and handle the error if `BackgroundProcessor`'s thread
30+
/// panics.
31+
pub thread_handle: JoinHandle<()>
32+
}
33+
34+
// Re-export these modules for the logging macros.
35+
mod util {
36+
pub(crate) mod logger {
37+
pub(crate) use lightning::util::logger::Record;
38+
}
39+
}
40+
41+
#[cfg(not(test))]
42+
const CHAN_FRESHNESS_TIMER: u64 = 60;
43+
#[cfg(test)]
44+
const CHAN_FRESHNESS_TIMER: u64 = 1;
45+
46+
impl BackgroundProcessor {
47+
/// Start the background thread that takes care of responsibilities (enumerated in the top-level
48+
/// documentation). Marked as `must_use` because otherwise the result is dropped immediately,
49+
/// resulting in the thread being terminated.
50+
/// Important note: this thread will panic if invoking `persist_manager` results in an error (and
51+
/// `start()` will need to be called again to restart the `BackgroundProcessor`).
52+
/// There are 3 main options for handling this panic:
53+
/// * wait on [`thread_handle`]'s `join()`, handle the error
54+
/// * [configure] to abort on panic
55+
/// * write a custom `persist_manager` to handle the error so it never gets returned to
56+
/// `BackgroundProcessor`.
57+
///
58+
/// `persist_manager` is responsible for writing out the `ChannelManager` to disk, and/or uploading
59+
/// to one or more backup services. See [`ChannelManager::write`] for writing out a `ChannelManager`.
60+
/// See [`FilesystemPersister::persist_manager`] for Rust-Lightning's provided implementation.
61+
///
62+
/// [`thread_handle`]: struct.BackgroundProcessor.html#structfield.thread_handle
63+
/// [configure]: https://doc.rust-lang.org/edition-guide/rust-2018/error-handling-and-panics/aborting-on-panic.html
64+
/// [`ChannelManager::write`]: ../lightning/ln/channelmanager/struct.ChannelManager.html#method.write
65+
/// [`FilesystemPersister::persist_manager`]: ../lightning_persister/struct.FilesystemPersister.html#impl
66+
#[must_use]
67+
pub fn start<PM, ChanSigner, M, T, K, F, L>(persist_manager: PM, manager: Arc<ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>, logger: Arc<L>) -> Self
68+
where ChanSigner: 'static + ChannelKeys + Writeable,
69+
M: 'static + chain::Watch<Keys=ChanSigner>,
70+
T: 'static + BroadcasterInterface,
71+
K: 'static + KeysInterface<ChanKeySigner=ChanSigner>,
72+
F: 'static + FeeEstimator,
73+
L: 'static + Logger,
74+
PM: 'static + Send + Fn(Arc<ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>) -> Result<(), std::io::Error>,
75+
{
76+
let stop_thread = Arc::new(AtomicBool::new(false));
77+
let stop_thread_clone = stop_thread.clone();
78+
let handle = thread::spawn(move || {
79+
let mut current_time = Instant::now();
80+
loop {
81+
let updates_available = manager.wait_timeout(Duration::from_millis(100));
82+
if updates_available {
83+
if let Err(e) = persist_manager(manager.clone()) {
84+
panic!("Errored persisting manager: {}", e);
85+
};
86+
}
87+
// If we see that the thread has been stopped, exit now.
88+
if stop_thread.load(Ordering::Acquire) == true {
89+
log_trace!(logger, "Terminating background processor.");
90+
break;
91+
}
92+
if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER {
93+
log_trace!(logger, "Calling manager's timer_chan_freshness_every_min");
94+
manager.timer_chan_freshness_every_min();
95+
current_time = Instant::now();
96+
}
97+
}
98+
});
99+
Self {
100+
stop_thread: stop_thread_clone,
101+
thread_handle: handle
102+
}
103+
}
104+
105+
/// Stop `BackgroundProcessor`'s thread.
106+
pub fn stop(&self) {
107+
self.stop_thread.store(true, Ordering::Release)
108+
}
109+
}
110+
111+
#[cfg(test)]
112+
mod tests {
113+
use bitcoin::blockdata::constants::genesis_block;
114+
use bitcoin::blockdata::transaction::{Transaction, TxOut};
115+
use bitcoin::network::constants::Network;
116+
use lightning::chain;
117+
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
118+
use lightning::chain::chainmonitor;
119+
use lightning::chain::keysinterface::{ChannelKeys, InMemoryChannelKeys, KeysInterface, KeysManager};
120+
use lightning::chain::transaction::OutPoint;
121+
use lightning::get_event_msg;
122+
use lightning::ln::channelmanager::{ChannelManager, SimpleArcChannelManager};
123+
use lightning::ln::features::InitFeatures;
124+
use lightning::ln::msgs::ChannelMessageHandler;
125+
use lightning::util::config::UserConfig;
126+
use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
127+
use lightning::util::logger::Logger;
128+
use lightning::util::ser::Writeable;
129+
use lightning::util::test_utils;
130+
use lightning_persister::FilesystemPersister;
131+
use std::fs;
132+
use std::path::PathBuf;
133+
use std::sync::{Arc, Mutex};
134+
use std::time::Duration;
135+
use super::BackgroundProcessor;
136+
137+
type ChainMonitor = chainmonitor::ChainMonitor<InMemoryChannelKeys, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
138+
139+
struct Node {
140+
node: SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>,
141+
persister: Arc<FilesystemPersister>,
142+
logger: Arc<test_utils::TestLogger>,
143+
}
144+
145+
impl Drop for Node {
146+
fn drop(&mut self) {
147+
let data_dir = self.persister.get_data_dir();
148+
match fs::remove_dir_all(data_dir.clone()) {
149+
Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
150+
_ => {}
151+
}
152+
}
153+
}
154+
155+
fn get_full_filepath(filepath: String, filename: String) -> String {
156+
let mut path = PathBuf::from(filepath);
157+
path.push(filename);
158+
path.to_str().unwrap().to_string()
159+
}
160+
161+
fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
162+
let mut nodes = Vec::new();
163+
for i in 0..num_nodes {
164+
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
165+
let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
166+
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
167+
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
168+
let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
169+
let seed = [i as u8; 32];
170+
let network = Network::Testnet;
171+
let now = Duration::from_secs(genesis_block(network).header.time as u64);
172+
let keys_manager = Arc::new(KeysManager::new(&seed, network, now.as_secs(), now.subsec_nanos()));
173+
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
174+
let manager = Arc::new(ChannelManager::new(Network::Testnet, fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), i));
175+
let node = Node { node: manager, persister, logger };
176+
nodes.push(node);
177+
}
178+
nodes
179+
}
180+
181+
macro_rules! open_channel {
182+
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
183+
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
184+
$node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
185+
$node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
186+
let events = $node_a.node.get_and_clear_pending_events();
187+
assert_eq!(events.len(), 1);
188+
let (temporary_channel_id, tx, funding_output) = match events[0] {
189+
Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => {
190+
assert_eq!(*channel_value_satoshis, $channel_value);
191+
assert_eq!(user_channel_id, 42);
192+
193+
let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
194+
value: *channel_value_satoshis, script_pubkey: output_script.clone(),
195+
}]};
196+
let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 };
197+
(*temporary_channel_id, tx, funding_outpoint)
198+
},
199+
_ => panic!("Unexpected event"),
200+
};
201+
202+
$node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output);
203+
$node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
204+
$node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
205+
tx
206+
}}
207+
}
208+
209+
#[test]
210+
fn test_background_processor() {
211+
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
212+
// updates. Also test that when new updates are available, the manager signals that it needs
213+
// re-persistence and is successfully re-persisted.
214+
let nodes = create_nodes(2, "test_background_processor".to_string());
215+
216+
// Initiate the background processors to watch each node.
217+
let data_dir_0 = nodes[0].persister.get_data_dir();
218+
let data_dir_1 = nodes[1].persister.get_data_dir();
219+
let callback_0 = move |node| FilesystemPersister::persist_manager(data_dir_0.clone(), node);
220+
let callback_1 = move |node| FilesystemPersister::persist_manager(data_dir_1.clone(), node);
221+
let _processor_0 = BackgroundProcessor::start(callback_0, nodes[0].node.clone(), nodes[0].logger.clone());
222+
let _processor_1 = BackgroundProcessor::start(callback_1, nodes[1].node.clone(), nodes[1].logger.clone());
223+
224+
// Go through the channel creation process until each node should have something persisted.
225+
let tx = open_channel!(nodes[0], nodes[1], 100000);
226+
227+
let mut done_persisting = false;
228+
macro_rules! check_persisted_data {
229+
($node: expr, $filepath: expr) => {
230+
let bytes = loop {
231+
match std::fs::read($filepath) {
232+
Ok(bytes) => break bytes,
233+
Err(_) => continue
234+
}
235+
};
236+
let mut expected_bytes = Vec::new();
237+
assert!($node.write(&mut expected_bytes).is_ok());
238+
if bytes == expected_bytes {
239+
done_persisting = true;
240+
}
241+
}
242+
}
243+
244+
// Check that the initial channel manager data is persisted as expected.
245+
let filepath_0 = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
246+
let filepath_1 = get_full_filepath("test_background_processor_persister_1".to_string(), "manager".to_string());
247+
loop {
248+
check_persisted_data!(nodes[0].node, filepath_0.clone());
249+
if done_persisting {
250+
// Check that eventually BackgroundProcessor resets the condvar when everything's done persisting.
251+
loop {
252+
if !nodes[0].node.get_persistence_condvar_value() { break }
253+
}
254+
break
255+
}
256+
}
257+
done_persisting = false;
258+
loop {
259+
check_persisted_data!(nodes[1].node, filepath_1.clone());
260+
if done_persisting {
261+
loop {
262+
if !nodes[1].node.get_persistence_condvar_value() { break }
263+
}
264+
break
265+
}
266+
}
267+
268+
// Force-close the channel.
269+
nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id());
270+
271+
// Check that the force-close updates are persisted.
272+
done_persisting = false;
273+
loop {
274+
check_persisted_data!(nodes[0].node, filepath_0.clone());
275+
if done_persisting {
276+
loop {
277+
if !nodes[0].node.get_persistence_condvar_value() { break }
278+
}
279+
break
280+
}
281+
}
282+
}
283+
284+
#[test]
285+
fn test_chan_freshness_called() {
286+
// Test that ChannelManager's `timer_chan_freshness_every_min` is called every
287+
// `CHAN_FRESHNESS_TIMER`.
288+
let nodes = create_nodes(1, "test_chan_freshness_called".to_string());
289+
let data_dir = nodes[0].persister.get_data_dir();
290+
let callback = move |node| FilesystemPersister::persist_manager(data_dir.clone(), node);
291+
let processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
292+
loop {
293+
let log_entries = nodes[0].logger.lines.lock().unwrap();
294+
let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string();
295+
if log_entries.get(&("background_processor".to_string(), desired_log)).is_some() {
296+
break
297+
}
298+
}
299+
processor.stop();
300+
processor.thread_handle.join().unwrap();
301+
}
302+
303+
#[test]
304+
fn test_persist_error() {
305+
// Test that if we encounter an error during manager persistence, the thread panics.
306+
fn persist_manager<ChanSigner, M, T, K, F, L>(_data: Arc<ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>) -> Result<(), std::io::Error>
307+
where ChanSigner: 'static + ChannelKeys + Writeable,
308+
M: 'static + chain::Watch<Keys=ChanSigner>,
309+
T: 'static + BroadcasterInterface,
310+
K: 'static + KeysInterface<ChanKeySigner=ChanSigner>,
311+
F: 'static + FeeEstimator,
312+
L: 'static + Logger,
313+
{
314+
Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
315+
}
316+
317+
let nodes = create_nodes(2, "test_persist_error".to_string());
318+
let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone());
319+
open_channel!(nodes[0], nodes[1], 100000);
320+
321+
match bg_processor.thread_handle.join() {
322+
Err(e) => {
323+
let error = e.downcast_ref::<String>().unwrap();
324+
assert_eq!("Errored persisting manager: test", error);
325+
}
326+
_ => panic!("Unexpected result")
327+
}
328+
}
329+
330+
#[test]
331+
fn test_stop_bg_thread() {
332+
// Test that we can successfully stop the background thread through BackgroundProcessor::stop().
333+
let nodes = create_nodes(1, "test_stop_bg_thread".to_string());
334+
let data_dir = nodes[0].persister.get_data_dir();
335+
let callback = move |node| FilesystemPersister::persist_manager(data_dir.clone(), node);
336+
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
337+
338+
// Stop the background processor and check that it terminated.
339+
bg_processor.stop();
340+
loop {
341+
let log_entries = nodes[0].logger.lines.lock().unwrap();
342+
if log_entries.get(&("background_processor".to_string(), "Terminating background processor.".to_string())).is_some() {
343+
break
344+
}
345+
}
346+
}
347+
}

0 commit comments

Comments
 (0)