Skip to content

Commit b4d0c88

Browse files
Aditya SharmaAditya Sharma
authored andcommitted
Distribute PeerStorage from ChainMonitor
Everytime a new block is added we send PeerStorage to all of our channel partner. - Add our_peer_storage and our_peerstorage_encryption_key to ChainMonitor - Write send_peer_storage() and send it to all channel partners whenever a new block is added - Write utility function for ChannelMonitor::write() to give an option to write ChannelMonitor for peerstorage - Append some channel information to the serialized ChannelMonitor so that we can build ChannelSigner when we call handle_your_peer_storage()
1 parent c34dbef commit b4d0c88

File tree

4 files changed

+152
-83
lines changed

4 files changed

+152
-83
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,7 @@ mod tests {
10981098
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
10991099
use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
11001100
use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1101-
use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
1101+
use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager, NodeSigner};
11021102
use lightning::util::config::UserConfig;
11031103
use lightning::util::persist::{
11041104
KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1580,6 +1580,7 @@ mod tests {
15801580
logger.clone(),
15811581
fee_estimator.clone(),
15821582
kv_store.clone(),
1583+
keys_manager.get_peer_storage_key(),
15831584
));
15841585
let best_block = BestBlock::from_network(network);
15851586
let params = ChainParameters { network, best_block };

lightning/src/chain/chainmonitor.rs

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ use bitcoin::hash_types::{Txid, BlockHash};
2929
use crate::chain;
3030
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
3131
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
32-
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor};
32+
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, write_util};
3333
use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
35+
use crate::ln::msgs;
3536
use crate::sign::ecdsa::EcdsaChannelSigner;
3637
use crate::events::{self, Event, EventHandler, ReplayEvent};
3738
use crate::util::logger::{Logger, WithContext};
3839
use crate::util::errors::APIError;
40+
use crate::util::ser::VecWriter;
3941
use crate::util::wakers::{Future, Notifier};
4042
use crate::ln::channel_state::ChannelDetails;
4143
use crate::ln::msgs::SendingOnlyMessageHandler;
@@ -46,6 +48,8 @@ use core::ops::Deref;
4648
use core::sync::atomic::{AtomicUsize, Ordering};
4749
use bitcoin::hashes::Hash;
4850
use bitcoin::secp256k1::PublicKey;
51+
use core::mem;
52+
use crate::ln::our_peer_storage::OurPeerStorage;
4953

5054
/// `Persist` defines behavior for persisting channel monitors: this could mean
5155
/// writing once to disk, and/or uploading to one or more backup services.
@@ -166,8 +170,8 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
166170
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
167171
}
168172

169-
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
170-
monitor: ChannelMonitor<ChannelSigner>,
173+
pub(crate) struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
174+
pub(crate) monitor: ChannelMonitor<ChannelSigner>,
171175
/// The full set of pending monitor updates for this Channel.
172176
///
173177
/// Note that this lock must be held from [`ChannelMonitor::update_monitor`] through to
@@ -182,7 +186,7 @@ struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
182186
/// could cause users to have a full [`ChannelMonitor`] on disk as well as a
183187
/// [`ChannelMonitorUpdate`] which was already applied. While this isn't an issue for the
184188
/// LDK-provided update-based [`Persist`], it is somewhat surprising for users so we avoid it.
185-
pending_monitor_updates: Mutex<Vec<u64>>,
189+
pub(crate) pending_monitor_updates: Mutex<Vec<u64>>,
186190
}
187191

188192
impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
@@ -196,8 +200,8 @@ impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
196200
/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
197201
/// released.
198202
pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
199-
lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
200-
funding_txo: OutPoint,
203+
pub(crate) lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
204+
pub(crate) funding_txo: OutPoint,
201205
}
202206

203207
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
@@ -246,6 +250,8 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
246250
/// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process).
247251
event_notifier: Notifier,
248252
pending_send_only_events: Mutex<Vec<MessageSendEvent>>,
253+
our_peer_storage: Mutex<OurPeerStorage>,
254+
our_peerstorage_encryption_key: [u8;32],
249255
}
250256

251257
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -395,6 +401,27 @@ where C::Target: chain::Filter,
395401
P::Target: Persist<ChannelSigner>,
396402
{
397403
fn send_peer_storage(&self, their_node_id: PublicKey) {
404+
let monitors: RwLockReadGuard<'_, hash_map::HashMap<OutPoint, MonitorHolder<ChannelSigner>, RandomState>> = self.monitors.read().unwrap();
405+
let mut ser_channels: Vec<u8> = Vec::new();
406+
log_debug!(self.logger, "Sending Peer Storage from chainmonitor");
407+
ser_channels.extend_from_slice(&(monitors.len() as u64).to_be_bytes());
408+
for (_, mon) in monitors.iter() {
409+
let mut ser_chan = VecWriter(Vec::new());
410+
411+
match write_util(&mon.monitor.inner.lock().unwrap(), true, &mut ser_chan) {
412+
Ok(_) => {
413+
ser_channels.extend_from_slice(&(ser_chan.0.len() as u64).to_be_bytes());
414+
ser_channels.extend(ser_chan.0.iter());
415+
}
416+
Err(_) => {
417+
panic!("Can not write monitor for {}", mon.monitor.channel_id())
418+
}
419+
}
420+
}
421+
self.our_peer_storage.lock().unwrap().stub_channels(ser_channels);
422+
423+
self.pending_send_only_events.lock().unwrap().push(events::MessageSendEvent::SendPeerStorageMessage { node_id: their_node_id
424+
, msg: msgs::PeerStorageMessage { data: self.our_peer_storage.lock().unwrap().encrypt_our_peer_storage(self.our_peerstorage_encryption_key) } })
398425
}
399426
}
400427

@@ -406,7 +433,7 @@ where C::Target: chain::Filter,
406433
/// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
407434
/// always need to fetch full blocks absent another means for determining which blocks contain
408435
/// transactions relevant to the watched channels.
409-
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
436+
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P, our_peerstorage_encryption_key: [u8; 32]) -> Self {
410437
Self {
411438
monitors: RwLock::new(new_hash_map()),
412439
chain_source,
@@ -418,6 +445,8 @@ where C::Target: chain::Filter,
418445
highest_chain_height: AtomicUsize::new(0),
419446
event_notifier: Notifier::new(),
420447
pending_send_only_events: Mutex::new(Vec::new()),
448+
our_peer_storage: Mutex::new(OurPeerStorage::new()),
449+
our_peerstorage_encryption_key
421450
}
422451
}
423452

@@ -685,6 +714,18 @@ where C::Target: chain::Filter,
685714
});
686715
}
687716
}
717+
718+
/// Retrieves all node IDs associated with the monitors.
719+
///
720+
/// This function collects the counterparty node IDs from all monitors into a `HashSet`,
721+
/// ensuring unique IDs are returned.
722+
fn get_peer_node_ids(&self) -> HashSet<PublicKey> {
723+
let mon = self.monitors.read().unwrap();
724+
mon
725+
.values()
726+
.map(|monitor| monitor.monitor.get_counterparty_node_id().unwrap().clone())
727+
.collect()
728+
}
688729
}
689730

690731
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
@@ -753,6 +794,12 @@ where
753794
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
754795
)
755796
});
797+
798+
// Send peer storage everytime a new block arrives.
799+
for node_id in self.get_peer_node_ids() {
800+
self.send_peer_storage(node_id);
801+
}
802+
756803
// Assume we may have some new events and wake the event processor
757804
self.event_notifier.notify();
758805
}

0 commit comments

Comments
 (0)