Skip to content

Commit 9e3d52a

Browse files
Aditya SharmaAditya Sharma
authored andcommitted
Handle PeerStorage Message and its Persistence
This commit introduces the handling and persistence of PeerStorage messages on a per-peer basis. The peer storage is stored within the PeerState to simplify management, ensuring we do not need to remove it when there are no active channels with the peer. Key changes include: - Add PeerStorage to PeerState for persistent storage. - Implement internal_peer_storage to manage PeerStorage and its updates. - Add resend logic in peer_connected() to resend PeerStorage before sending the channel reestablish message upon reconnection. - Update PeerState's write() and read() methods to support PeerStorage persistence.
1 parent a69cba5 commit 9e3d52a

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,9 +1168,24 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
11681168
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
11691169
/// [`ChannelMessageHandler::peer_disconnected`].
11701170
pub is_connected: bool,
1171+
/// Holds the peer storage data for the channel partner on a per-peer basis.
1172+
peer_storage: Vec<u8>,
11711173
}
11721174

11731175
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
1176+
pub fn new(features: &InitFeatures) -> Self {
1177+
Self {
1178+
channel_by_id: new_hash_map(),
1179+
inbound_channel_request_by_id: new_hash_map(),
1180+
latest_features: features.clone(),
1181+
pending_msg_events: Vec::new(),
1182+
in_flight_monitor_updates: BTreeMap::new(),
1183+
monitor_update_blocked_actions: BTreeMap::new(),
1184+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1185+
is_connected: true,
1186+
peer_storage: Vec::new(),
1187+
}
1188+
}
11741189
/// Indicates that a peer meets the criteria where we're ok to remove it from our storage.
11751190
/// If true is passed for `require_disconnected`, the function will return false if we haven't
11761191
/// disconnected from the node already, ie. `PeerState::is_connected` is set to `true`.
@@ -7001,6 +7016,7 @@ where
70017016
monitor_update_blocked_actions: BTreeMap::new(),
70027017
actions_blocking_raa_monitor_updates: BTreeMap::new(),
70037018
is_connected: false,
7019+
peer_storage: Vec::new(),
70047020
}));
70057021
let mut peer_state = peer_state_mutex.lock().unwrap();
70067022

@@ -7861,6 +7877,42 @@ where
78617877
}
78627878
}
78637879

7880+
fn internal_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) {
7881+
let per_peer_state = self.per_peer_state.read().unwrap();
7882+
let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
7883+
Some(peer_state_mutex) => peer_state_mutex,
7884+
None => return,
7885+
};
7886+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
7887+
let peer_state = &mut *peer_state_lock;
7888+
let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None);
7889+
7890+
// Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with).
7891+
if !peer_state.channel_by_id.values().any(|phase| matches!(phase, ChannelPhase::Funded(_))) {
7892+
log_debug!(logger, "We do not have any channel with {}", log_pubkey!(counterparty_node_id));
7893+
return;
7894+
}
7895+
7896+
#[cfg(not(test))]
7897+
if msg.data.len() > 1024 {
7898+
log_debug!(logger, "We do not allow more than 1 KiB of data for each peer in peer storage. Sending warning to peer {}", log_pubkey!(counterparty_node_id));
7899+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
7900+
node_id: counterparty_node_id.clone(),
7901+
action: msgs::ErrorAction::SendWarningMessage {
7902+
msg: msgs::WarningMessage {
7903+
channel_id: ChannelId([0; 32]),
7904+
data: "Supports only data up to 1 KiB in peer storage.".to_owned()
7905+
},
7906+
log_level: Level::Trace,
7907+
}
7908+
});
7909+
return;
7910+
}
7911+
7912+
log_trace!(logger, "Received Peer Storage from {}", log_pubkey!(counterparty_node_id));
7913+
peer_state.peer_storage = msg.data.clone();
7914+
}
7915+
78647916
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
78657917
let best_block = *self.best_block.read().unwrap();
78667918
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -10801,6 +10853,7 @@ where
1080110853
monitor_update_blocked_actions: BTreeMap::new(),
1080210854
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1080310855
is_connected: true,
10856+
peer_storage: Vec::new(),
1080410857
}));
1080510858
},
1080610859
hash_map::Entry::Occupied(e) => {
@@ -10830,6 +10883,16 @@ where
1083010883
let peer_state = &mut *peer_state_lock;
1083110884
let pending_msg_events = &mut peer_state.pending_msg_events;
1083210885

10886+
if !peer_state.peer_storage.is_empty() {
10887+
pending_msg_events.push(events::MessageSendEvent::SendYourPeerStorageMessage {
10888+
node_id: counterparty_node_id.clone(),
10889+
msg: msgs::YourPeerStorageMessage {
10890+
data: peer_state.peer_storage.clone()
10891+
},
10892+
});
10893+
}
10894+
10895+
1083310896
for (_, phase) in peer_state.channel_by_id.iter_mut() {
1083410897
match phase {
1083510898
ChannelPhase::Funded(chan) => {
@@ -11915,6 +11978,12 @@ where
1191511978
if !peer_state.ok_to_remove(false) {
1191611979
peer_pubkey.write(writer)?;
1191711980
peer_state.latest_features.write(writer)?;
11981+
11982+
(peer_state.peer_storage.len() as u64).write(writer)?;
11983+
for p in peer_state.peer_storage.iter() {
11984+
p.write(writer)?;
11985+
}
11986+
1191811987
if !peer_state.monitor_update_blocked_actions.is_empty() {
1191911988
monitor_update_blocked_actions_per_peer
1192011989
.get_or_insert_with(Vec::new)
@@ -12430,6 +12499,7 @@ where
1243012499
monitor_update_blocked_actions: BTreeMap::new(),
1243112500
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1243212501
is_connected: false,
12502+
peer_storage: Vec::new(),
1243312503
}
1243412504
};
1243512505

@@ -12440,6 +12510,15 @@ where
1244012510
let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(new_hash_map());
1244112511
let mut peer_state = peer_state_from_chans(peer_chans);
1244212512
peer_state.latest_features = Readable::read(reader)?;
12513+
12514+
let peer_storage_count:u64 = Readable::read(reader)?;
12515+
let mut peer_storage: Vec<u8> = Vec::with_capacity(cmp::min(peer_storage_count as usize, MAX_ALLOC_SIZE/mem::size_of::<u8>()));
12516+
for i in 0..peer_storage_count {
12517+
let x = Readable::read(reader)?;
12518+
peer_storage.insert(i as usize, x);
12519+
}
12520+
peer_state.peer_storage = peer_storage;
12521+
1244312522
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
1244412523
}
1244512524

0 commit comments

Comments
 (0)