Skip to content

Commit 8f80e7e

Browse files
Aditya SharmaAditya Sharma
authored andcommitted
lightning: Handle peer storage message, it's persistance and send it to the respective peer upon reconnection.
1 parent 8c6854a commit 8f80e7e

File tree

4 files changed

+108
-7
lines changed

4 files changed

+108
-7
lines changed

lightning/src/chain/channelmonitor.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,9 @@ pub(crate) enum ChannelMonitorUpdateStep {
560560
ShutdownScript {
561561
scriptpubkey: ScriptBuf,
562562
},
563+
LatestPeerStorage {
564+
data: Vec<u8>,
565+
},
563566
}
564567

565568
impl ChannelMonitorUpdateStep {
@@ -571,6 +574,7 @@ impl ChannelMonitorUpdateStep {
571574
ChannelMonitorUpdateStep::CommitmentSecret { .. } => "CommitmentSecret",
572575
ChannelMonitorUpdateStep::ChannelForceClosed { .. } => "ChannelForceClosed",
573576
ChannelMonitorUpdateStep::ShutdownScript { .. } => "ShutdownScript",
577+
ChannelMonitorUpdateStep::LatestPeerStorage { .. } => "LatestPeerStorage",
574578
}
575579
}
576580
}
@@ -604,6 +608,9 @@ impl_writeable_tlv_based_enum_upgradable!(ChannelMonitorUpdateStep,
604608
(5, ShutdownScript) => {
605609
(0, scriptpubkey, required),
606610
},
611+
(6, LatestPeerStorage) => {
612+
(0, data, required_vec),
613+
},
607614
);
608615

609616
/// Details about the balance(s) available for spending once the channel appears on chain.
@@ -851,6 +858,8 @@ pub(crate) struct ChannelMonitorImpl<Signer: WriteableEcdsaChannelSigner> {
851858
/// revoked.
852859
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
853860

861+
peer_storage: Vec<u8>,
862+
854863
// Note that `MonitorEvent`s MUST NOT be generated during update processing, only generated
855864
// during chain data processing. This prevents a race in `ChainMonitor::update_channel` (and
856865
// presumably user implementations thereof as well) where we update the in-memory channel
@@ -1144,6 +1153,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signe
11441153
(15, self.counterparty_fulfilled_htlcs, required),
11451154
(17, self.initial_counterparty_commitment_info, option),
11461155
(19, self.channel_id, required),
1156+
(21, self.peer_storage, required_vec),
11471157
});
11481158

11491159
Ok(())
@@ -1323,7 +1333,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
13231333
confirmed_commitment_tx_counterparty_output: None,
13241334
htlcs_resolved_on_chain: Vec::new(),
13251335
spendable_txids_confirmed: Vec::new(),
1326-
1336+
peer_storage: Vec::new(),
13271337
best_block,
13281338
counterparty_node_id: Some(counterparty_node_id),
13291339
initial_counterparty_commitment_info: None,
@@ -1440,6 +1450,11 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
14401450
self.inner.lock().unwrap().channel_id()
14411451
}
14421452

1453+
/// Fets peer_storage of this peer.
1454+
pub fn get_peer_storage(&self) -> Vec<u8> {
1455+
self.inner.lock().unwrap().peer_storage()
1456+
}
1457+
14431458
/// Gets a list of txids, with their output scripts (in the order they appear in the
14441459
/// transaction), which we must learn about spends of via block_connected().
14451460
pub fn get_outputs_to_watch(&self) -> Vec<(Txid, Vec<(u32, ScriptBuf)>)> {
@@ -2761,6 +2776,10 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
27612776
}
27622777
}
27632778

2779+
fn update_peer_storage(&mut self, new_data: Vec<u8>) {
2780+
self.peer_storage = new_data;
2781+
}
2782+
27642783
fn generate_claimable_outpoints_and_watch_outputs(&mut self, reason: ClosureReason) -> (Vec<PackageTemplate>, Vec<TransactionOutputs>) {
27652784
let funding_outp = HolderFundingOutput::build(
27662785
self.funding_redeemscript.clone(),
@@ -2936,6 +2955,10 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
29362955
panic!("Attempted to replace shutdown script {} with {}", shutdown_script, scriptpubkey);
29372956
}
29382957
},
2958+
ChannelMonitorUpdateStep::LatestPeerStorage { data } => {
2959+
log_trace!(logger, "Updating ChannelMonitor with latest recieved PeerStorage");
2960+
self.update_peer_storage(data.clone());
2961+
}
29392962
}
29402963
}
29412964

@@ -2967,6 +2990,10 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
29672990
&self.funding_info
29682991
}
29692992

2993+
pub fn peer_storage(&self) -> Vec<u8> {
2994+
self.peer_storage.clone()
2995+
}
2996+
29702997
pub fn channel_id(&self) -> ChannelId {
29712998
self.channel_id
29722999
}
@@ -4632,6 +4659,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
46324659
let mut counterparty_fulfilled_htlcs = Some(new_hash_map());
46334660
let mut initial_counterparty_commitment_info = None;
46344661
let mut channel_id = None;
4662+
let mut peer_storage = Some(Vec::new());
46354663
read_tlv_fields!(reader, {
46364664
(1, funding_spend_confirmed, option),
46374665
(3, htlcs_resolved_on_chain, optional_vec),
@@ -4643,6 +4671,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
46434671
(15, counterparty_fulfilled_htlcs, option),
46444672
(17, initial_counterparty_commitment_info, option),
46454673
(19, channel_id, option),
4674+
(21, peer_storage, optional_vec),
46464675
});
46474676

46484677
// `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both
@@ -4717,7 +4746,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
47174746
confirmed_commitment_tx_counterparty_output,
47184747
htlcs_resolved_on_chain: htlcs_resolved_on_chain.unwrap(),
47194748
spendable_txids_confirmed: spendable_txids_confirmed.unwrap(),
4720-
4749+
peer_storage: peer_storage.unwrap(),
47214750
best_block,
47224751
counterparty_node_id,
47234752
initial_counterparty_commitment_info,

lightning/src/chain/transaction.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ impl OutPoint {
6666
vout: self.index as u32,
6767
}
6868
}
69+
70+
pub fn get_txid(self) -> Txid {
71+
self.txid
72+
}
6973
}
7074

7175
impl core::fmt::Display for OutPoint {

lightning/src/ln/channel.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,6 +2101,11 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {
21012101
self.update_time_counter
21022102
}
21032103

2104+
pub fn increment_and_fetch_monitor_update_id(&mut self) -> u64 {
2105+
self.latest_monitor_update_id +=1;
2106+
self.latest_monitor_update_id
2107+
}
2108+
21042109
pub fn get_latest_monitor_update_id(&self) -> u64 {
21052110
self.latest_monitor_update_id
21062111
}

lightning/src/ln/channelmanager.rs

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
904904
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
905905
/// [`ChannelMessageHandler::peer_disconnected`].
906906
is_connected: bool,
907+
peer_storage: Vec<u8>,
907908
}
908909

909910
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -2526,7 +2527,6 @@ where
25262527
entropy_source,
25272528
node_signer,
25282529
signer_provider,
2529-
25302530
logger,
25312531
}
25322532
}
@@ -6685,6 +6685,53 @@ where
66856685
}
66866686
}
66876687

6688+
fn internal_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) {
6689+
let per_peer_state = self.per_peer_state.write().unwrap();
6690+
let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
6691+
Some(peer_state_mutex) => peer_state_mutex,
6692+
None => return,
6693+
};
6694+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
6695+
let peer_state = &mut *peer_state_lock;
6696+
let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None);
6697+
6698+
// Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with).
6699+
if peer_state.total_channel_count() == 0 {
6700+
log_debug!(logger, "We do not have any channel with {}", log_pubkey!(counterparty_node_id));
6701+
return;
6702+
}
6703+
6704+
let mut funded_channels: Vec<&mut Channel<SP>> = peer_state.channel_by_id.values_mut()
6705+
.filter_map(|phase| {
6706+
if let ChannelPhase::Funded(channel) = phase {
6707+
Some(channel)
6708+
} else {
6709+
None
6710+
}
6711+
})
6712+
.collect();
6713+
6714+
let min_funded_chan = funded_channels
6715+
.iter_mut()
6716+
.min_by_key(|s| s.context.get_funding_txo().unwrap().get_txid())
6717+
.unwrap();
6718+
6719+
// Send ChannelMonitor Update.
6720+
let peer_storage_update = ChannelMonitorUpdate {
6721+
update_id: min_funded_chan.context.increment_and_fetch_monitor_update_id(),
6722+
counterparty_node_id: None,
6723+
updates: vec![ChannelMonitorUpdateStep::LatestPeerStorage {
6724+
data: msg.data.clone(),
6725+
}],
6726+
channel_id: Some(min_funded_chan.context.channel_id()),
6727+
};
6728+
6729+
// Update the store.
6730+
peer_state.peer_storage = msg.data.clone();
6731+
6732+
handle_new_monitor_update!(self, min_funded_chan.context.get_funding_txo().unwrap(), peer_storage_update, peer_state_lock, peer_state, per_peer_state, min_funded_chan);
6733+
}
6734+
66886735
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
66896736
let best_block = *self.best_block.read().unwrap();
66906737
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -9069,6 +9116,8 @@ where
90699116
}
90709117

90719118
fn handle_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) {
9119+
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
9120+
self.internal_peer_storage(counterparty_node_id, msg);
90729121
}
90739122

90749123
fn handle_your_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::YourPeerStorageMessage) {
@@ -9384,6 +9433,7 @@ where
93849433
monitor_update_blocked_actions: BTreeMap::new(),
93859434
actions_blocking_raa_monitor_updates: BTreeMap::new(),
93869435
is_connected: true,
9436+
peer_storage: Vec::new(),
93879437
}));
93889438
},
93899439
hash_map::Entry::Occupied(e) => {
@@ -9456,6 +9506,15 @@ where
94569506
},
94579507
}
94589508
}
9509+
9510+
let peer_storage = peer_state.peer_storage.clone();
9511+
9512+
pending_msg_events.push(events::MessageSendEvent::SendYourPeerStorageMessage {
9513+
node_id: counterparty_node_id.clone(),
9514+
msg: msgs::YourPeerStorageMessage {
9515+
data: peer_storage
9516+
},
9517+
});
94599518
}
94609519

94619520
return NotifyOption::SkipPersistHandleEvents;
@@ -10783,6 +10842,7 @@ where
1078310842
let mut channel_closures = VecDeque::new();
1078410843
let mut close_background_events = Vec::new();
1078510844
let mut funding_txo_to_channel_id = hash_map_with_capacity(channel_count as usize);
10845+
let mut peer_storage_dir: HashMap<PublicKey, Vec<u8>> = HashMap::new();
1078610846
for _ in 0..channel_count {
1078710847
let mut channel: Channel<SP> = Channel::read(reader, (
1078810848
&args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
@@ -10792,6 +10852,9 @@ where
1079210852
funding_txo_to_channel_id.insert(funding_txo, channel.context.channel_id());
1079310853
funding_txo_set.insert(funding_txo.clone());
1079410854
if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
10855+
// Load Peer_storage from ChannelMonitor to memory.
10856+
peer_storage_dir.insert(channel.context.get_counterparty_node_id(), monitor.get_peer_storage());
10857+
1079510858
if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() ||
1079610859
channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() ||
1079710860
channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() ||
@@ -10938,7 +11001,7 @@ where
1093811001
claimable_htlcs_list.push((payment_hash, previous_hops));
1093911002
}
1094011003

10941-
let peer_state_from_chans = |channel_by_id| {
11004+
let peer_state_from_chans = |channel_by_id, peer_storage_blob| {
1094211005
PeerState {
1094311006
channel_by_id,
1094411007
inbound_channel_request_by_id: new_hash_map(),
@@ -10948,6 +11011,7 @@ where
1094811011
monitor_update_blocked_actions: BTreeMap::new(),
1094911012
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1095011013
is_connected: false,
11014+
peer_storage: peer_storage_blob,
1095111015
}
1095211016
};
1095311017

@@ -10956,7 +11020,7 @@ where
1095611020
for _ in 0..peer_count {
1095711021
let peer_pubkey = Readable::read(reader)?;
1095811022
let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(new_hash_map());
10959-
let mut peer_state = peer_state_from_chans(peer_chans);
11023+
let mut peer_state = peer_state_from_chans(peer_chans, peer_storage_dir.get(&peer_pubkey).cloned().unwrap_or_default());
1096011024
peer_state.latest_features = Readable::read(reader)?;
1096111025
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
1096211026
}
@@ -11167,7 +11231,7 @@ where
1116711231
// still open, we need to replay any monitor updates that are for closed channels,
1116811232
// creating the neccessary peer_state entries as we go.
1116911233
let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| {
11170-
Mutex::new(peer_state_from_chans(new_hash_map()))
11234+
Mutex::new(peer_state_from_chans(new_hash_map(), peer_storage_dir.get(&counterparty_id).cloned().unwrap_or_default()))
1117111235
});
1117211236
let mut peer_state = peer_state_mutex.lock().unwrap();
1117311237
handle_in_flight_updates!(counterparty_id, chan_in_flight_updates,
@@ -11617,7 +11681,6 @@ where
1161711681
entropy_source: args.entropy_source,
1161811682
node_signer: args.node_signer,
1161911683
signer_provider: args.signer_provider,
11620-
1162111684
logger: args.logger,
1162211685
default_configuration: args.default_config,
1162311686
};

0 commit comments

Comments
 (0)