Skip to content

Commit 7f92788

Browse files
Add block_until_needs_persist to ChannelManager
This will allow the ChannelManager to signal when it has new updates to persist.
1 parent 661ea26 commit 7f92788

File tree

1 file changed

+61
-2
lines changed

1 file changed

+61
-2
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use util::errors::APIError;
5858
use std::{cmp, mem};
5959
use std::collections::{HashMap, hash_map, HashSet};
6060
use std::io::{Cursor, Read};
61-
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
61+
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Condvar};
6262
use std::sync::atomic::{AtomicUsize, Ordering};
6363
use std::time::Duration;
6464
use std::marker::{Sync, Send};
@@ -439,6 +439,14 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
439439
/// Taken first everywhere where we are making changes before any other locks.
440440
total_consistency_lock: RwLock<()>,
441441

442+
/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
443+
/// disk/backups. Users won't access the persistence_lock directly, but rather wait on its bool
444+
/// using block_until_needs_persist.
445+
#[cfg(any(test, feature = "_test_utils"))]
446+
pub persistence_lock: Arc<(Mutex<bool>, Condvar)>,
447+
#[cfg(not(any(test, feature = "_test_utils")))]
448+
persistence_lock: Arc<(Mutex<bool>, Condvar)>,
449+
442450
keys_manager: K,
443451

444452
logger: L,
@@ -760,12 +768,36 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
760768
pending_events: Mutex::new(Vec::new()),
761769
total_consistency_lock: RwLock::new(()),
762770

771+
persistence_lock: Arc::new((Mutex::new(false), Condvar::new())),
772+
763773
keys_manager,
764774

765775
logger,
766776
}
767777
}
768778

779+
/// Used to signal to users when the ChannelManager needs to be persisted to disk. This function
780+
/// blocks either for the given Duration or indefinitely until the ChannelManager has updates and
781+
/// needs persistence. It returns a bool indicating whether persistence is necessary (which will
782+
/// always be true if max_wait is None).
783+
pub fn block_until_needs_persist(&self, max_wait: Option<Duration>) -> bool {
784+
loop {
785+
let mutcond = Arc::clone(&self.persistence_lock);
786+
let &(ref mtx, ref cvar) = &*mutcond;
787+
let mut guard = mtx.lock().unwrap();
788+
let result = match max_wait {
789+
Some(wait) => cvar.wait_timeout(guard, wait).unwrap().0,
790+
None => cvar.wait(guard).unwrap(),
791+
};
792+
guard = result;
793+
let saved_res = *guard;
794+
*guard = false;
795+
if saved_res || max_wait.is_some() {
796+
return saved_res;
797+
}
798+
}
799+
}
800+
769801
/// Creates a new outbound channel to the given remote node and with the given value.
770802
///
771803
/// user_id will be provided back as user_channel_id in FundingGenerationReady and
@@ -913,6 +945,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
913945
// the latest local state, which is the best we can do anyway. Thus, it is safe to
914946
// ignore the result here.
915947
let _ = self.chain_monitor.update_channel(funding_txo, monitor_update);
948+
self.persist_updates();
916949
}
917950
}
918951

@@ -1313,6 +1346,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
13131346
commitment_signed,
13141347
},
13151348
});
1349+
self.persist_updates();
13161350
},
13171351
None => {},
13181352
}
@@ -1707,6 +1741,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
17071741
commitment_signed: commitment_msg,
17081742
},
17091743
});
1744+
self.persist_updates();
17101745
}
17111746
} else {
17121747
unreachable!();
@@ -2126,6 +2161,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
21262161
}
21272162
});
21282163
}
2164+
self.persist_updates();
21292165
return Ok(())
21302166
},
21312167
Err(e) => {
@@ -2340,6 +2376,15 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
23402376
Ok(())
23412377
}
23422378

2379+
// Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
2380+
fn persist_updates(&self) {
2381+
let &(ref persist_mtx, ref cnd) = &*self.persistence_lock;
2382+
let mut persistence_lock = persist_mtx.lock().unwrap();
2383+
*persistence_lock = true;
2384+
cnd.notify_all();
2385+
2386+
}
2387+
23432388
fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
23442389
let ((funding_msg, monitor), mut chan) = {
23452390
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2379,6 +2424,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
23792424
},
23802425
}
23812426
}
2427+
2428+
self.persist_updates();
2429+
23822430
let mut channel_state_lock = self.channel_state.lock().unwrap();
23832431
let channel_state = &mut *channel_state_lock;
23842432
match channel_state.by_id.entry(funding_msg.channel_id) {
@@ -2417,6 +2465,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
24172465
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
24182466
}
24192467
};
2468+
2469+
self.persist_updates();
2470+
24202471
let mut pending_events = self.pending_events.lock().unwrap();
24212472
pending_events.push(events::Event::FundingBroadcastSafe {
24222473
funding_txo,
@@ -2712,6 +2763,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
27122763
msg,
27132764
});
27142765
}
2766+
self.persist_updates();
27152767
Ok(())
27162768
},
27172769
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -2803,9 +2855,13 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
28032855
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
28042856
}
28052857
self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]);
2858+
self.persist_updates();
28062859
Ok(())
28072860
},
2808-
Err(e) => Err(e)
2861+
Err(e) => {
2862+
self.persist_updates();
2863+
Err(e)
2864+
}
28092865
}
28102866
}
28112867

@@ -2946,6 +3002,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29463002
msg,
29473003
});
29483004
}
3005+
self.persist_updates();
29493006
Ok(())
29503007
},
29513008
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -2995,6 +3052,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29953052
commitment_signed,
29963053
},
29973054
});
3055+
self.persist_updates();
29983056
}
29993057
},
30003058
}
@@ -3994,6 +4052,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
39944052

39954053
pending_events: Mutex::new(pending_events_read),
39964054
total_consistency_lock: RwLock::new(()),
4055+
persistence_lock: Arc::new((Mutex::new(false), Condvar::new())),
39974056
keys_manager: args.keys_manager,
39984057
logger: args.logger,
39994058
default_configuration: args.default_config,

0 commit comments

Comments
 (0)