Skip to content

Use interior mutability in ChannelMonitor #813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
hash_map::Entry::Occupied(entry) => entry,
hash_map::Entry::Vacant(_) => panic!("Didn't have monitor on update call"),
};
let mut deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::
read(&mut Cursor::new(&map_entry.get().1), &OnlyReadsKeysInterface {}).unwrap().1;
deserialized_monitor.update_monitor(&update, &&TestBroadcaster{}, &&FuzzEstimator{}, &self.logger).unwrap();
let mut ser = VecWriter(Vec::new());
Expand Down
5 changes: 2 additions & 3 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use lightning::chain;
///
/// use lightning_block_sync::*;
///
/// use std::cell::RefCell;
/// use std::io::Cursor;
///
/// async fn init_sync<
Expand Down Expand Up @@ -83,7 +82,7 @@ use lightning::chain;
///
/// // Synchronize any channel monitors and the channel manager to be on the best block.
/// let mut cache = UnboundedCache::new();
/// let mut monitor_listener = (RefCell::new(monitor), &*tx_broadcaster, &*fee_estimator, &*logger);
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
/// let listeners = vec![
/// (monitor_block_hash, &mut monitor_listener as &mut dyn chain::Listen),
/// (manager_block_hash, &mut manager as &mut dyn chain::Listen),
Expand All @@ -92,7 +91,7 @@ use lightning::chain;
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
///
/// // Allow the chain monitor to watch any channels.
/// let monitor = monitor_listener.0.into_inner();
/// let monitor = monitor_listener.0;
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
///
/// // Create an SPV client to notify the chain monitor and channel manager of block events.
Expand Down
36 changes: 18 additions & 18 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use util::events;
use util::events::Event;

use std::collections::{HashMap, hash_map};
use std::sync::Mutex;
use std::sync::RwLock;
use std::ops::Deref;

/// An implementation of [`chain::Watch`] for monitoring channels.
Expand All @@ -64,7 +64,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
P::Target: channelmonitor::Persist<ChannelSigner>,
{
/// The monitors
pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
chain_source: Option<C>,
broadcaster: T,
logger: L,
Expand Down Expand Up @@ -93,8 +93,8 @@ where C::Target: chain::Filter,
/// [`chain::Watch::release_pending_monitor_events`]: ../trait.Watch.html#tymethod.release_pending_monitor_events
/// [`chain::Filter`]: ../trait.Filter.html
pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
let mut monitors = self.monitors.lock().unwrap();
for monitor in monitors.values_mut() {
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);

if let Some(ref chain_source) = self.chain_source {
Expand All @@ -113,8 +113,8 @@ where C::Target: chain::Filter,
///
/// [`ChannelMonitor::block_disconnected`]: ../channelmonitor/struct.ChannelMonitor.html#method.block_disconnected
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
let mut monitors = self.monitors.lock().unwrap();
for monitor in monitors.values_mut() {
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
monitor.block_disconnected(header, disconnected_height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
}
}
Expand All @@ -130,7 +130,7 @@ where C::Target: chain::Filter,
/// [`chain::Filter`]: ../trait.Filter.html
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
Self {
monitors: Mutex::new(HashMap::new()),
monitors: RwLock::new(HashMap::new()),
chain_source,
broadcaster,
logger,
Expand Down Expand Up @@ -177,7 +177,7 @@ where C::Target: chain::Filter,
///
/// [`chain::Filter`]: ../trait.Filter.html
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
let mut monitors = self.monitors.lock().unwrap();
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(funding_outpoint) {
hash_map::Entry::Occupied(_) => {
log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
Expand Down Expand Up @@ -209,8 +209,8 @@ where C::Target: chain::Filter,
/// `ChainMonitor` monitors lock.
fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> {
// Update the monitor that watches the channel referred to by the given outpoint.
let mut monitors = self.monitors.lock().unwrap();
match monitors.get_mut(&funding_txo) {
let monitors = self.monitors.read().unwrap();
match monitors.get(&funding_txo) {
None => {
log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");

Expand All @@ -222,15 +222,15 @@ where C::Target: chain::Filter,
#[cfg(not(any(test, feature = "fuzztarget")))]
Err(ChannelMonitorUpdateErr::PermanentFailure)
},
Some(orig_monitor) => {
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor));
let update_res = orig_monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
Some(monitor) => {
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
if let Err(e) = &update_res {
log_error!(self.logger, "Failed to update channel monitor: {:?}", e);
}
// Even if updating the monitor returns an error, the monitor's state will
// still be changed. So, persist the updated monitor despite the error.
let persist_res = self.persister.update_persisted_channel(funding_txo, &update, orig_monitor);
let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor);
if let Err(ref e) = persist_res {
log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e);
}
Expand All @@ -245,8 +245,8 @@ where C::Target: chain::Filter,

fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
let mut pending_monitor_events = Vec::new();
for chan in self.monitors.lock().unwrap().values_mut() {
pending_monitor_events.append(&mut chan.get_and_clear_pending_monitor_events());
for monitor in self.monitors.read().unwrap().values() {
pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events());
}
pending_monitor_events
}
Expand All @@ -261,8 +261,8 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
{
fn get_and_clear_pending_events(&self) -> Vec<Event> {
let mut pending_events = Vec::new();
for chan in self.monitors.lock().unwrap().values_mut() {
pending_events.append(&mut chan.get_and_clear_pending_events());
for monitor in self.monitors.read().unwrap().values() {
pending_events.append(&mut monitor.get_and_clear_pending_events());
}
pending_events
}
Expand Down
Loading