Skip to content

Commit d224f04

Browse files
Add persistence_lock to ChannelManager
This will allow the ChannelManager to signal when it has new updates to persist.
1 parent 811795f commit d224f04

File tree

1 file changed

+66
-2
lines changed

1 file changed

+66
-2
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use util::errors::APIError;
5858
use std::{cmp, mem};
5959
use std::collections::{HashMap, hash_map, HashSet};
6060
use std::io::{Cursor, Read};
61-
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
61+
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Condvar};
6262
use std::sync::atomic::{AtomicUsize, Ordering};
6363
use std::time::Duration;
6464
use std::marker::{Sync, Send};
@@ -439,6 +439,41 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
439439
/// Taken first everywhere where we are making changes before any other locks.
440440
total_consistency_lock: RwLock<()>,
441441

442+
/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
443+
/// disk/backups.
444+
/// Example usage:
445+
/// ```
446+
/// use std::sync::Arc;
447+
/// type TxBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface;
448+
/// type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
449+
/// type Logger = dyn lightning::util::logger::Logger;
450+
/// type ChainAccess = dyn lightning::chain::Access;
451+
/// type ChainFilter = dyn lightning::chain::Filter;
452+
/// type DataPersister = dyn lightning::chain::channelmonitor::Persist<lightning::chain::keysinterface::InMemoryChannelKeys>;
453+
/// type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
454+
/// type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>;
455+
/// fn start(manager: ChannelManager) {
456+
/// let mutcond = Arc::clone(&manager.persistence_lock);
457+
/// // ChannelManager persistence should generally be run in a background thread since it can be
458+
/// // time-consuming to persist.
459+
/// std::thread::spawn(move || {
460+
/// let &(ref mtx, ref cnd) = &*mutcond;
461+
/// loop {
462+
/// let mut guard = mtx.lock().unwrap();
463+
/// // If there's a new update, we break out of the while loop, reset the condition variable,
464+
/// // and persist the ChannelManager to disk/backups.
465+
/// while !*guard {
466+
/// guard = cnd.wait(guard).unwrap();
467+
/// }
468+
/// *guard = false;
469+
/// std::mem::drop(guard); // Don't hold the lock while persisting the ChannelManager.
470+
/// // Persist the ChannelManager here.
471+
/// }
472+
/// });
473+
/// }
474+
/// ```
475+
pub persistence_lock: Arc<(Mutex<bool>, Condvar)>,
476+
442477
keys_manager: K,
443478

444479
logger: L,
@@ -760,6 +795,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
760795
pending_events: Mutex::new(Vec::new()),
761796
total_consistency_lock: RwLock::new(()),
762797

798+
persistence_lock: Arc::new((Mutex::new(false), Condvar::new())),
799+
763800
keys_manager,
764801

765802
logger,
@@ -913,6 +950,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
913950
// the latest local state, which is the best we can do anyway. Thus, it is safe to
914951
// ignore the result here.
915952
let _ = self.chain_monitor.update_channel(funding_txo, monitor_update);
953+
self.persist_updates();
916954
}
917955
}
918956

@@ -1313,6 +1351,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
13131351
commitment_signed,
13141352
},
13151353
});
1354+
self.persist_updates();
13161355
},
13171356
None => {},
13181357
}
@@ -1707,6 +1746,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
17071746
commitment_signed: commitment_msg,
17081747
},
17091748
});
1749+
self.persist_updates();
17101750
}
17111751
} else {
17121752
unreachable!();
@@ -2126,6 +2166,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
21262166
}
21272167
});
21282168
}
2169+
self.persist_updates();
21292170
return Ok(())
21302171
},
21312172
Err(e) => {
@@ -2340,6 +2381,15 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
23402381
Ok(())
23412382
}
23422383

2384+
// Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
2385+
fn persist_updates(&self) {
2386+
let &(ref persist_mtx, ref cnd) = &*self.persistence_lock;
2387+
let mut persistence_lock = persist_mtx.lock().unwrap();
2388+
*persistence_lock = true;
2389+
cnd.notify_all();
2390+
2391+
}
2392+
23432393
fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
23442394
let ((funding_msg, monitor), mut chan) = {
23452395
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2379,6 +2429,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
23792429
},
23802430
}
23812431
}
2432+
2433+
self.persist_updates();
2434+
23822435
let mut channel_state_lock = self.channel_state.lock().unwrap();
23832436
let channel_state = &mut *channel_state_lock;
23842437
match channel_state.by_id.entry(funding_msg.channel_id) {
@@ -2417,6 +2470,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
24172470
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
24182471
}
24192472
};
2473+
2474+
self.persist_updates();
2475+
24202476
let mut pending_events = self.pending_events.lock().unwrap();
24212477
pending_events.push(events::Event::FundingBroadcastSafe {
24222478
funding_txo,
@@ -2712,6 +2768,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
27122768
msg,
27132769
});
27142770
}
2771+
self.persist_updates();
27152772
Ok(())
27162773
},
27172774
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -2803,9 +2860,13 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
28032860
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
28042861
}
28052862
self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]);
2863+
self.persist_updates();
28062864
Ok(())
28072865
},
2808-
Err(e) => Err(e)
2866+
Err(e) => {
2867+
self.persist_updates();
2868+
Err(e)
2869+
}
28092870
}
28102871
}
28112872

@@ -2946,6 +3007,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29463007
msg,
29473008
});
29483009
}
3010+
self.persist_updates();
29493011
Ok(())
29503012
},
29513013
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -2995,6 +3057,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29953057
commitment_signed,
29963058
},
29973059
});
3060+
self.persist_updates();
29983061
}
29993062
},
30003063
}
@@ -3994,6 +4057,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
39944057

39954058
pending_events: Mutex::new(pending_events_read),
39964059
total_consistency_lock: RwLock::new(()),
4060+
persistence_lock: Arc::new((Mutex::new(false), Condvar::new())),
39974061
keys_manager: args.keys_manager,
39984062
logger: args.logger,
39994063
default_configuration: args.default_config,

0 commit comments

Comments
 (0)