-
Notifications
You must be signed in to change notification settings - Fork 411
PeerStorage: Add feature and store peer storage in ChannelManager #3575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1404,6 +1404,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider { | |
/// [`ChannelMessageHandler::peer_connected`] and no corresponding | ||
/// [`ChannelMessageHandler::peer_disconnected`]. | ||
pub is_connected: bool, | ||
/// Holds the peer storage data for the channel partner on a per-peer basis. | ||
peer_storage: Vec<u8>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we encapsulate this in a (maybe even optional?) FWIW, such a type and it's associated logic could live in a new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @TheBlueMatt, what's your opinion on this? Would love to hear your thoughts! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I commented on discord that we should store them in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Well, IIRC we know that we want to considerably refactor and move things out of I imagine if we introduce a separate message handler and dedicated types, essentially almost all of the peer storage related logic (and tests) could live in a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In principle, sure, but looking at this PR we'd move about 10 LoC into the new module? Basically we'd take the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, sure, but from a look at #2943 it seems there are quite some more changes ahead, also to other huge files. But, I don't want to hold this PR over this too long, I guess at some point we'll have to go through refactoring pain either way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm? That PR only adds about another 50 LoC in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think having a separate module for peer storage would be useful if, in the future, we decide to sell this as a service to our peers and increase our storage limit to 64 KiB. For now, I believe this approach is simplistic and good enough. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, while I still think it would be nice to have at least a separate message handler, I don't want to delay this PR too much given there have been prior discussions. So probably fine to move forward now, and possibly refactor later. |
||
} | ||
|
||
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider { | ||
|
@@ -2872,6 +2874,13 @@ const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4; | |
/// this many peers we reject new (inbound) channels from peers with which we don't have a channel. | ||
const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50; | ||
|
||
/// The maximum allowed size for peer storage, in bytes. | ||
/// | ||
/// This constant defines the upper limit for the size of data | ||
/// that can be stored for a peer. It is set to 1024 bytes (1 kilobyte) | ||
/// to prevent excessive resource consumption. | ||
const MAX_PEER_STORAGE_SIZE: usize = 1024; | ||
|
||
/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this | ||
/// many peers we reject new (inbound) connections. | ||
const MAX_NO_CHANNEL_PEERS: usize = 250; | ||
|
@@ -8245,6 +8254,53 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ | |
} | ||
} | ||
|
||
fn internal_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> { | ||
// TODO: Decrypt and check if have any stale or missing ChannelMonitor. | ||
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None); | ||
|
||
log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id)); | ||
|
||
Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn( | ||
"Invalid peer_storage_retrieval message received.".into(), | ||
), ChannelId([0; 32]))) | ||
} | ||
|
||
fn internal_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> { | ||
let per_peer_state = self.per_peer_state.read().unwrap(); | ||
let peer_state_mutex = per_peer_state.get(&counterparty_node_id) | ||
.ok_or_else(|| { | ||
debug_assert!(false); | ||
MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), ChannelId([0; 32])) | ||
})?; | ||
|
||
let mut peer_state_lock = peer_state_mutex.lock().unwrap(); | ||
let peer_state = &mut *peer_state_lock; | ||
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None); | ||
|
||
// Check if we have any channels with the peer (Currently we only provide the service to peers we have a channel with). | ||
if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) { | ||
log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to return a warning in this case to let the peer know we're not gonna store the data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By protocol, we don’t need to do it, but yes, it’s a good suggestion. |
||
return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn( | ||
"Ignoring peer_storage message, as peer storage is currently supported only for \ | ||
peers with an active funded channel.".into(), | ||
), ChannelId([0; 32]))); | ||
} | ||
|
||
#[cfg(not(test))] | ||
if msg.data.len() > MAX_PEER_STORAGE_SIZE { | ||
log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id)); | ||
|
||
return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn( | ||
format!("Supports only data up to {} bytes in peer storage.", MAX_PEER_STORAGE_SIZE) | ||
), ChannelId([0; 32]))); | ||
} | ||
|
||
log_trace!(logger, "Received peer_storage from {}", log_pubkey!(counterparty_node_id)); | ||
peer_state.peer_storage = msg.data; | ||
|
||
Ok(()) | ||
} | ||
|
||
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { | ||
let best_block = *self.best_block.read().unwrap(); | ||
let per_peer_state = self.per_peer_state.read().unwrap(); | ||
|
@@ -11443,6 +11499,16 @@ where | |
let _ = handle_error!(self, self.internal_funding_signed(&counterparty_node_id, msg), counterparty_node_id); | ||
} | ||
|
||
fn handle_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) { | ||
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents); | ||
let _ = handle_error!(self, self.internal_peer_storage(counterparty_node_id, msg), counterparty_node_id); | ||
} | ||
|
||
fn handle_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorageRetrieval) { | ||
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents); | ||
let _ = handle_error!(self, self.internal_peer_storage_retrieval(counterparty_node_id, msg), counterparty_node_id); | ||
} | ||
|
||
fn handle_channel_ready(&self, counterparty_node_id: PublicKey, msg: &msgs::ChannelReady) { | ||
// Note that we never need to persist the updated ChannelManager for an inbound | ||
// channel_ready message - while the channel's state will change, any channel_ready message | ||
|
@@ -11684,6 +11750,10 @@ where | |
&events::MessageSendEvent::SendShortIdsQuery { .. } => false, | ||
&events::MessageSendEvent::SendReplyChannelRange { .. } => false, | ||
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, | ||
|
||
// Peer Storage | ||
&events::MessageSendEvent::SendPeerStorage { .. } => false, | ||
&events::MessageSendEvent::SendPeerStorageRetrieval { .. } => false, | ||
} | ||
}); | ||
debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect"); | ||
|
@@ -11736,6 +11806,7 @@ where | |
actions_blocking_raa_monitor_updates: BTreeMap::new(), | ||
closed_channel_monitor_update_ids: BTreeMap::new(), | ||
is_connected: true, | ||
peer_storage: Vec::new(), | ||
})); | ||
}, | ||
hash_map::Entry::Occupied(e) => { | ||
|
@@ -11765,6 +11836,15 @@ where | |
let peer_state = &mut *peer_state_lock; | ||
let pending_msg_events = &mut peer_state.pending_msg_events; | ||
|
||
if !peer_state.peer_storage.is_empty() { | ||
pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrieval { | ||
node_id: counterparty_node_id.clone(), | ||
msg: msgs::PeerStorageRetrieval { | ||
data: peer_state.peer_storage.clone() | ||
}, | ||
}); | ||
} | ||
|
||
for (_, chan) in peer_state.channel_by_id.iter_mut() { | ||
let logger = WithChannelContext::from(&self.logger, &chan.context(), None); | ||
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) { | ||
|
@@ -12451,6 +12531,7 @@ pub fn provided_init_features(config: &UserConfig) -> InitFeatures { | |
features.set_scid_privacy_optional(); | ||
features.set_zero_conf_optional(); | ||
features.set_route_blinding_optional(); | ||
features.set_provide_storage_optional(); | ||
if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { | ||
features.set_anchors_zero_fee_htlc_tx_optional(); | ||
} | ||
|
@@ -12972,6 +13053,8 @@ where | |
peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self()); | ||
} | ||
|
||
let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new(); | ||
|
||
(serializable_peer_count).write(writer)?; | ||
for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) { | ||
// Peers which we have no channels to should be dropped once disconnected. As we | ||
|
@@ -12981,6 +13064,8 @@ where | |
if !peer_state.ok_to_remove(false) { | ||
peer_pubkey.write(writer)?; | ||
peer_state.latest_features.write(writer)?; | ||
peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage)); | ||
|
||
if !peer_state.monitor_update_blocked_actions.is_empty() { | ||
monitor_update_blocked_actions_per_peer | ||
.get_or_insert_with(Vec::new) | ||
|
@@ -13102,6 +13187,7 @@ where | |
(14, decode_update_add_htlcs_opt, option), | ||
(15, self.inbound_payment_id_secret, required), | ||
(17, in_flight_monitor_updates, required), | ||
(19, peer_storage_dir, optional_vec), | ||
}); | ||
|
||
Ok(()) | ||
|
@@ -13334,6 +13420,7 @@ where | |
monitor_update_blocked_actions: BTreeMap::new(), | ||
actions_blocking_raa_monitor_updates: BTreeMap::new(), | ||
closed_channel_monitor_update_ids: BTreeMap::new(), | ||
peer_storage: Vec::new(), | ||
is_connected: false, | ||
} | ||
}; | ||
|
@@ -13629,6 +13716,7 @@ where | |
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None; | ||
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None; | ||
let mut inbound_payment_id_secret = None; | ||
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None; | ||
read_tlv_fields!(reader, { | ||
(1, pending_outbound_payments_no_retry, option), | ||
(2, pending_intercepted_htlcs, option), | ||
|
@@ -13645,8 +13733,10 @@ where | |
(14, decode_update_add_htlcs, option), | ||
(15, inbound_payment_id_secret, option), | ||
(17, in_flight_monitor_updates, required), | ||
(19, peer_storage_dir, optional_vec), | ||
}); | ||
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map()); | ||
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new); | ||
if fake_scid_rand_bytes.is_none() { | ||
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); | ||
} | ||
|
@@ -13678,6 +13768,12 @@ where | |
} | ||
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap()); | ||
|
||
for (peer_pubkey, peer_storage) in peer_storage_dir { | ||
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) { | ||
peer_state.get_mut().unwrap().peer_storage = peer_storage; | ||
} | ||
} | ||
|
||
// Handle transitioning from the legacy TLV to the new one on upgrades. | ||
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates { | ||
// We should never serialize an empty map. | ||
|
@@ -14752,6 +14848,70 @@ mod tests { | |
} | ||
} | ||
|
||
#[test] | ||
fn test_peer_storage() { | ||
let chanmon_cfgs = create_chanmon_cfgs(2); | ||
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); | ||
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); | ||
let nodes = create_network(2, &node_cfgs, &node_chanmgrs); | ||
|
||
create_announced_chan_between_nodes(&nodes, 0, 1); | ||
|
||
// Since we do not send peer storage, we manually simulate receiving a dummy | ||
// `PeerStorage` from the channel partner. | ||
nodes[0].node.handle_peer_storage(nodes[1].node.get_our_node_id(), msgs::PeerStorage{data: vec![0; 100]}); | ||
|
||
nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); | ||
nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id()); | ||
|
||
nodes[0].node.peer_connected(nodes[1].node.get_our_node_id(), &msgs::Init { | ||
features: nodes[1].node.init_features(), networks: None, remote_network_address: None | ||
}, true).unwrap(); | ||
nodes[1].node.peer_connected(nodes[0].node.get_our_node_id(), &msgs::Init { | ||
features: nodes[0].node.init_features(), networks: None, remote_network_address: None | ||
}, false).unwrap(); | ||
|
||
let node_0_events = nodes[0].node.get_and_clear_pending_msg_events(); | ||
assert_eq!(node_0_events.len(), 2); | ||
|
||
for msg in node_0_events{ | ||
if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg { | ||
nodes[1].node.handle_channel_reestablish(nodes[0].node.get_our_node_id(), msg); | ||
assert_eq!(*node_id, nodes[1].node.get_our_node_id()); | ||
} else if let MessageSendEvent::SendPeerStorageRetrieval { ref node_id, ref msg } = msg { | ||
nodes[1].node.handle_peer_storage_retrieval(nodes[0].node.get_our_node_id(), msg.clone()); | ||
assert_eq!(*node_id, nodes[1].node.get_our_node_id()); | ||
} else { | ||
panic!("Unexpected event") | ||
} | ||
} | ||
|
||
let msg_events_after_peer_storage_retrieval = nodes[1].node.get_and_clear_pending_msg_events(); | ||
|
||
// Check if we receive a warning message. | ||
let peer_storage_warning: Vec<&MessageSendEvent> = msg_events_after_peer_storage_retrieval | ||
.iter() | ||
.filter(|event| match event { | ||
MessageSendEvent::HandleError { .. } => true, | ||
_ => false, | ||
}) | ||
.collect(); | ||
|
||
assert_eq!(peer_storage_warning.len(), 1); | ||
|
||
match peer_storage_warning[0] { | ||
MessageSendEvent::HandleError { node_id, action } => { | ||
assert_eq!(*node_id, nodes[0].node.get_our_node_id()); | ||
match action { | ||
ErrorAction::SendWarningMessage { msg, .. } => | ||
assert_eq!(msg.data, "Invalid peer_storage_retrieval message received.".to_owned()), | ||
_ => panic!("Unexpected error action"), | ||
} | ||
} | ||
_ => panic!("Unexpected event"), | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_keysend_dup_payment_hash() { | ||
// (1): Test that a keysend payment with a duplicate payment hash to an existing pending | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that peer storage is not necessarily connected to channel operation (as, IIUC, the spec allows to store/retrieve for non-channel peers), I wonder if this should introduce a new message handler type rather than just adding onto
ChannelMessageHandler
? Splitting it out now might also make it easier to move the storage into a dedicated object rather than handling everything viaChannelManager
?