Skip to content

Commit 948a2ad

Browse files
Aditya Sharmaadi2011
authored andcommitted
Handle PeerStorage Message and its Persistence
This commit introduces the handling and persistence of PeerStorage messages on a per-peer basis. The peer storage is stored within the PeerState to simplify management, ensuring we do not need to remove it when there are no active channels with the peer. Key changes include: - Add PeerStorage to PeerState for persistent storage. - Implement internal_peer_storage to manage PeerStorage and its updates. - Add resend logic in peer_connected() to resend PeerStorage before sending the channel reestablish message upon reconnection. - Update PeerState's write() and read() methods to support PeerStorage persistence.
1 parent 3fb91c1 commit 948a2ad

File tree

1 file changed

+102
-2
lines changed

1 file changed

+102
-2
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13801380
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
13811381
/// [`ChannelMessageHandler::peer_disconnected`].
13821382
pub is_connected: bool,
1383+
/// Holds the peer storage data for the channel partner on a per-peer basis.
1384+
peer_storage: Vec<u8>,
13831385
}
13841386

13851387
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -2848,6 +2850,13 @@ const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
28482850
/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
28492851
const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;
28502852

2853+
/// The maximum allowed size for peer storage, in bytes.
2854+
///
2855+
/// This constant defines the upper limit for the size of data
2856+
/// that can be stored for a peer. It is set to 1024 bytes (1 kilobyte)
2857+
/// to prevent excessive resource consumption.
2858+
const MAX_PEER_STORAGE_SIZE: usize = 1024;
2859+
28512860
/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
28522861
/// many peers we reject new (inbound) connections.
28532862
const MAX_NO_CHANNEL_PEERS: usize = 250;
@@ -8221,9 +8230,75 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
82218230
}
82228231
}
82238232

8224-
fn internal_peer_storage_retrieval(&self, _counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) {}
8233+
fn internal_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) {
8234+
// TODO: Decrypt and check if have any stale or missing ChannelMonitor.
8235+
let per_peer_state = self.per_peer_state.read().unwrap();
8236+
let peer_state_mutex = match per_peer_state.get(&counterparty_node_id) {
8237+
Some(peer_state_mutex) => peer_state_mutex,
8238+
None => return,
8239+
};
8240+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8241+
let peer_state = &mut *peer_state_lock;
8242+
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
82258243

8226-
fn internal_peer_storage(&self, _counterparty_node_id: PublicKey, _msg: msgs::PeerStorage) {}
8244+
log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id));
8245+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
8246+
node_id: counterparty_node_id.clone(),
8247+
action: msgs::ErrorAction::SendWarningMessage {
8248+
msg: msgs::WarningMessage {
8249+
channel_id: ChannelId([0; 32]),
8250+
data: "Invalid peer_storage_retrieval message received.".to_owned()
8251+
},
8252+
log_level: Level::Trace,
8253+
}
8254+
});
8255+
}
8256+
8257+
fn internal_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) {
8258+
let per_peer_state = self.per_peer_state.read().unwrap();
8259+
let peer_state_mutex = match per_peer_state.get(&counterparty_node_id) {
8260+
Some(peer_state_mutex) => peer_state_mutex,
8261+
None => return,
8262+
};
8263+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8264+
let peer_state = &mut *peer_state_lock;
8265+
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);
8266+
8267+
// Check if we have any channels with the peer (Currently we only provide the service to peers we have a channel with).
8268+
if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
8269+
log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id));
8270+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
8271+
node_id: counterparty_node_id.clone(),
8272+
action: msgs::ErrorAction::SendWarningMessage {
8273+
msg: msgs::WarningMessage {
8274+
channel_id: ChannelId([0; 32]),
8275+
data: "Ignoring peer_storage message, as peer storage is currently supported only for peers with an active funded channel.".to_owned()
8276+
},
8277+
log_level: Level::Trace,
8278+
}
8279+
});
8280+
return;
8281+
}
8282+
8283+
#[cfg(not(test))]
8284+
if msg.data.len() > MAX_PEER_STORAGE_SIZE {
8285+
log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id));
8286+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
8287+
node_id: counterparty_node_id.clone(),
8288+
action: msgs::ErrorAction::SendWarningMessage {
8289+
msg: msgs::WarningMessage {
8290+
channel_id: ChannelId([0; 32]),
8291+
data: format!("Supports only data up to {} bytes in peer storage.", MAX_PEER_STORAGE_SIZE)
8292+
},
8293+
log_level: Level::Trace,
8294+
}
8295+
});
8296+
return;
8297+
}
8298+
8299+
log_trace!(logger, "Received peer_storage from {}", log_pubkey!(counterparty_node_id));
8300+
peer_state.peer_storage = msg.data;
8301+
}
82278302

82288303
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
82298304
let best_block = *self.best_block.read().unwrap();
@@ -11728,6 +11803,7 @@ where
1172811803
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1172911804
closed_channel_monitor_update_ids: BTreeMap::new(),
1173011805
is_connected: true,
11806+
peer_storage: Vec::new(),
1173111807
}));
1173211808
},
1173311809
hash_map::Entry::Occupied(e) => {
@@ -11757,6 +11833,15 @@ where
1175711833
let peer_state = &mut *peer_state_lock;
1175811834
let pending_msg_events = &mut peer_state.pending_msg_events;
1175911835

11836+
if !peer_state.peer_storage.is_empty() {
11837+
pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrieval {
11838+
node_id: counterparty_node_id.clone(),
11839+
msg: msgs::PeerStorageRetrieval {
11840+
data: peer_state.peer_storage.clone()
11841+
},
11842+
});
11843+
}
11844+
1176011845
for (_, chan) in peer_state.channel_by_id.iter_mut() {
1176111846
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
1176211847
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
@@ -12928,6 +13013,8 @@ where
1292813013
peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
1292913014
}
1293013015

13016+
let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new();
13017+
1293113018
(serializable_peer_count).write(writer)?;
1293213019
for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
1293313020
// Peers which we have no channels to should be dropped once disconnected. As we
@@ -12937,6 +13024,8 @@ where
1293713024
if !peer_state.ok_to_remove(false) {
1293813025
peer_pubkey.write(writer)?;
1293913026
peer_state.latest_features.write(writer)?;
13027+
peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage));
13028+
1294013029
if !peer_state.monitor_update_blocked_actions.is_empty() {
1294113030
monitor_update_blocked_actions_per_peer
1294213031
.get_or_insert_with(Vec::new)
@@ -13058,6 +13147,7 @@ where
1305813147
(14, decode_update_add_htlcs_opt, option),
1305913148
(15, self.inbound_payment_id_secret, required),
1306013149
(17, in_flight_monitor_updates, required),
13150+
(19, peer_storage_dir, optional_vec),
1306113151
});
1306213152

1306313153
Ok(())
@@ -13290,6 +13380,7 @@ where
1329013380
monitor_update_blocked_actions: BTreeMap::new(),
1329113381
actions_blocking_raa_monitor_updates: BTreeMap::new(),
1329213382
closed_channel_monitor_update_ids: BTreeMap::new(),
13383+
peer_storage: Vec::new(),
1329313384
is_connected: false,
1329413385
}
1329513386
};
@@ -13585,6 +13676,7 @@ where
1358513676
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
1358613677
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
1358713678
let mut inbound_payment_id_secret = None;
13679+
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
1358813680
read_tlv_fields!(reader, {
1358913681
(1, pending_outbound_payments_no_retry, option),
1359013682
(2, pending_intercepted_htlcs, option),
@@ -13601,8 +13693,10 @@ where
1360113693
(14, decode_update_add_htlcs, option),
1360213694
(15, inbound_payment_id_secret, option),
1360313695
(17, in_flight_monitor_updates, required),
13696+
(19, peer_storage_dir, optional_vec),
1360413697
});
1360513698
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
13699+
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
1360613700
if fake_scid_rand_bytes.is_none() {
1360713701
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
1360813702
}
@@ -13634,6 +13728,12 @@ where
1363413728
}
1363513729
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());
1363613730

13731+
for (peer_pubkey, peer_storage) in peer_storage_dir {
13732+
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
13733+
peer_state.get_mut().unwrap().peer_storage = peer_storage;
13734+
}
13735+
}
13736+
1363713737
// Handle transitioning from the legacy TLV to the new one on upgrades.
1363813738
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
1363913739
// We should never serialize an empty map.

0 commit comments

Comments
 (0)