Skip to content

PeerStorage: Add feature and store peer storage in ChannelManager #3575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lightning-net-tokio/src/lib.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that peer storage is not necessarily connected to channel operation (as, IIUC, the spec allows to store/retrieve for non-channel peers), I wonder if this should introduce a new message handler type rather than just adding onto ChannelMessageHandler? Splitting it out now might also make it easier to move the storage into a dedicated object rather than handling everything via ChannelManager?

Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,11 @@ mod tests {
fn handle_tx_init_rbf(&self, _their_node_id: PublicKey, _msg: &TxInitRbf) {}
fn handle_tx_ack_rbf(&self, _their_node_id: PublicKey, _msg: &TxAckRbf) {}
fn handle_tx_abort(&self, _their_node_id: PublicKey, _msg: &TxAbort) {}
fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: PeerStorage) {}
fn handle_peer_storage_retrieval(
&self, _their_node_id: PublicKey, _msg: PeerStorageRetrieval,
) {
}
fn peer_disconnected(&self, their_node_id: PublicKey) {
if their_node_id == self.expected_pubkey {
self.disconnected_flag.store(true, Ordering::SeqCst);
Expand Down
24 changes: 22 additions & 2 deletions lightning-types/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
//! (see the [`Trampoline` feature proposal](https://github.com/lightning/bolts/pull/836) for more information).
//! - `DnsResolver` - supports resolving DNS names to TXT DNSSEC proofs for BIP 353 payments
//! (see [bLIP 32](https://github.com/lightning/blips/blob/master/blip-0032.md) for more information).
//! - `ProvideStorage` - Indicates that we offer the capability to store data of our peers
//! (see https://github.com/lightning/bolts/pull/1110 for more info).
//!
//! LDK knows about the following features, but does not support them:
//! - `AnchorsNonzeroFeeHtlcTx` - the initial version of anchor outputs, which was later found to be
Expand Down Expand Up @@ -152,7 +154,7 @@ mod sealed {
// Byte 4
OnionMessages,
// Byte 5
ChannelType | SCIDPrivacy,
ProvideStorage | ChannelType | SCIDPrivacy,
// Byte 6
ZeroConf,
// Byte 7
Expand All @@ -173,7 +175,7 @@ mod sealed {
// Byte 4
OnionMessages,
// Byte 5
ChannelType | SCIDPrivacy,
ProvideStorage | ChannelType | SCIDPrivacy,
// Byte 6
ZeroConf | Keysend,
// Byte 7
Expand Down Expand Up @@ -544,6 +546,16 @@ mod sealed {
supports_onion_messages,
requires_onion_messages
);
define_feature!(
43,
ProvideStorage,
[InitContext, NodeContext],
"Feature flags for `option_provide_storage`.",
set_provide_storage_optional,
set_provide_storage_required,
supports_provide_storage,
requires_provide_storage
);
define_feature!(
45,
ChannelType,
Expand Down Expand Up @@ -1126,6 +1138,14 @@ mod tests {
assert!(!features1.requires_unknown_bits_from(&features2));
assert!(!features2.requires_unknown_bits_from(&features1));

features1.set_provide_storage_required();
assert!(features1.requires_unknown_bits_from(&features2));
assert!(!features2.requires_unknown_bits_from(&features1));

features2.set_provide_storage_optional();
assert!(!features1.requires_unknown_bits_from(&features2));
assert!(!features2.requires_unknown_bits_from(&features1));

features1.set_data_loss_protect_required();
assert!(features1.requires_unknown_bits_from(&features2));
assert!(!features2.requires_unknown_bits_from(&features1));
Expand Down
17 changes: 17 additions & 0 deletions lightning/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2636,6 +2636,23 @@ pub enum MessageSendEvent {
/// The gossip_timestamp_filter which should be sent.
msg: msgs::GossipTimestampFilter,
},
/// Sends a channel partner Peer Storage of our backup which they should store.
/// This should be sent on each new connection to the channel partner or whenever we want
/// them to update the backup that they store.
SendPeerStorage {
/// The node_id of this message recipient
node_id: PublicKey,
/// The peer_storage which should be sent.
msg: msgs::PeerStorage,
},
/// Sends a channel partner their own peer storage which we store and update when they send
/// a [`msgs::PeerStorage`].
SendPeerStorageRetrieval {
/// The node_id of this message recipient
node_id: PublicKey,
/// The peer_storage_retrieval which should be sent.
msg: msgs::PeerStorageRetrieval,
}
}

/// A trait indicating an object may generate message send events
Expand Down
160 changes: 160 additions & 0 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
/// [`ChannelMessageHandler::peer_disconnected`].
pub is_connected: bool,
/// Holds the peer storage data for the channel partner on a per-peer basis.
peer_storage: Vec<u8>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we encapsulate this in a (maybe even optional?) PeerStorageManager or similar type that we can transition to in the future, even if for now we're storing it in ChannelManager?

FWIW, such a type and it's associated logic could live in a new src/ln/peer_store.rs module, instead of adding to the (already humongous) channelmanager.rs (which we're planning to split up going forward anyways).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheBlueMatt, what's your opinion on this? Would love to hear your thoughts!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I commented on discord that we should store them in ChannelManager because we should include them in the PeerState, whether that means a Vec or another struct doesn't matter much, but I imagine it wouldn't actually encapsulate very much (this PR is only +300, including the test), so I'm not quite sure I understand the desire for it. To be clear, @tnull, the stuff for actually storing our own stuff (rather than storing stuff for our peers) is relatively encapsulated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I commented on discord that we should store them in ChannelManager because we should include them in the PeerState, whether that means a Vec or another struct doesn't matter much, but I imagine it wouldn't actually encapsulate very much (this PR is only +300, including the test), so I'm not quite sure I understand the desire for it.

Well, IIRC we know that we want to considerably refactor and move things out of channelmanager.rs at some point in the future, and we ofc will have to eventually run the formatter on it. So from my perspective, anything that could be added in separate modules should be, as otherwise we'll only have to move the code anyways going forward. Essentially, by starting to modularization now we not only make it easier to keep context on certain parts of the code (e.g., peer storage), but also make our future-selves' jobs easier.

I imagine if we introduce a separate message handler and dedicated types, essentially almost all of the peer storage related logic (and tests) could live in a peer_storage.rs, even if the data itself is kept as part of PeerState?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle, sure, but looking at this PR we'd move about 10 LoC into the new module? Basically we'd take the peer_state lock in channelmanager.rs, look up the peer entry, then pass the message off to the module which would only check if the message is too long and then store it? I don't see how that makes for cleaner code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, sure, but from a look at #2943 it seems there are quite some more changes ahead, also to other huge files. But, I don't want to hold this PR over this too long, I guess at some point we'll have to go through refactoring pain either way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm? That PR only adds about another 50 LoC in channelmanager.rs beyond what this one adds. And its all working with channels in the ChannelManager so not sure its easily extract-able.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a separate module for peer storage would be useful if, in the future, we decide to sell this as a service to our peers and increase our storage limit to 64 KiB. For now, I believe this approach is simplistic and good enough.

Copy link
Contributor

@tnull tnull Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, while I still think it would be nice to have at least a separate message handler, I don't want to delay this PR too much given there have been prior discussions. So probably fine to move forward now, and possibly refactor later.

}

impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
Expand Down Expand Up @@ -2872,6 +2874,13 @@ const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;

/// The maximum allowed size for peer storage, in bytes.
///
/// This constant defines the upper limit for the size of data
/// that can be stored for a peer. It is set to 1024 bytes (1 kilobyte)
/// to prevent excessive resource consumption.
const MAX_PEER_STORAGE_SIZE: usize = 1024;

/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
/// many peers we reject new (inbound) connections.
const MAX_NO_CHANNEL_PEERS: usize = 250;
Expand Down Expand Up @@ -8245,6 +8254,53 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
}
}

fn internal_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, _msg: msgs::PeerStorageRetrieval) -> Result<(), MsgHandleErrInternal> {
// TODO: Decrypt and check if have any stale or missing ChannelMonitor.
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);

log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id));

Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
"Invalid peer_storage_retrieval message received.".into(),
), ChannelId([0; 32])))
}

fn internal_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) -> Result<(), MsgHandleErrInternal> {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
.ok_or_else(|| {
debug_assert!(false);
MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), ChannelId([0; 32]))
})?;

let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None);

// Check if we have any channels with the peer (Currently we only provide the service to peers we have a channel with).
if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to return a warning in this case to let the peer know we're not gonna store the data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By protocol, we don’t need to do it, but yes, it’s a good suggestion.

return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
"Ignoring peer_storage message, as peer storage is currently supported only for \
peers with an active funded channel.".into(),
), ChannelId([0; 32])));
}

#[cfg(not(test))]
if msg.data.len() > MAX_PEER_STORAGE_SIZE {
log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id));

return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(
format!("Supports only data up to {} bytes in peer storage.", MAX_PEER_STORAGE_SIZE)
), ChannelId([0; 32])));
}

log_trace!(logger, "Received peer_storage from {}", log_pubkey!(counterparty_node_id));
peer_state.peer_storage = msg.data;

Ok(())
}

fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
let best_block = *self.best_block.read().unwrap();
let per_peer_state = self.per_peer_state.read().unwrap();
Expand Down Expand Up @@ -11443,6 +11499,16 @@ where
let _ = handle_error!(self, self.internal_funding_signed(&counterparty_node_id, msg), counterparty_node_id);
}

fn handle_peer_storage(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorage) {
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents);
let _ = handle_error!(self, self.internal_peer_storage(counterparty_node_id, msg), counterparty_node_id);
}

fn handle_peer_storage_retrieval(&self, counterparty_node_id: PublicKey, msg: msgs::PeerStorageRetrieval) {
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents);
let _ = handle_error!(self, self.internal_peer_storage_retrieval(counterparty_node_id, msg), counterparty_node_id);
}

fn handle_channel_ready(&self, counterparty_node_id: PublicKey, msg: &msgs::ChannelReady) {
// Note that we never need to persist the updated ChannelManager for an inbound
// channel_ready message - while the channel's state will change, any channel_ready message
Expand Down Expand Up @@ -11684,6 +11750,10 @@ where
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
&events::MessageSendEvent::SendReplyChannelRange { .. } => false,
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,

// Peer Storage
&events::MessageSendEvent::SendPeerStorage { .. } => false,
&events::MessageSendEvent::SendPeerStorageRetrieval { .. } => false,
}
});
debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect");
Expand Down Expand Up @@ -11736,6 +11806,7 @@ where
actions_blocking_raa_monitor_updates: BTreeMap::new(),
closed_channel_monitor_update_ids: BTreeMap::new(),
is_connected: true,
peer_storage: Vec::new(),
}));
},
hash_map::Entry::Occupied(e) => {
Expand Down Expand Up @@ -11765,6 +11836,15 @@ where
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;

if !peer_state.peer_storage.is_empty() {
pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrieval {
node_id: counterparty_node_id.clone(),
msg: msgs::PeerStorageRetrieval {
data: peer_state.peer_storage.clone()
},
});
}

for (_, chan) in peer_state.channel_by_id.iter_mut() {
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
Expand Down Expand Up @@ -12451,6 +12531,7 @@ pub fn provided_init_features(config: &UserConfig) -> InitFeatures {
features.set_scid_privacy_optional();
features.set_zero_conf_optional();
features.set_route_blinding_optional();
features.set_provide_storage_optional();
if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx {
features.set_anchors_zero_fee_htlc_tx_optional();
}
Expand Down Expand Up @@ -12972,6 +13053,8 @@ where
peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
}

let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new();

(serializable_peer_count).write(writer)?;
for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
// Peers which we have no channels to should be dropped once disconnected. As we
Expand All @@ -12981,6 +13064,8 @@ where
if !peer_state.ok_to_remove(false) {
peer_pubkey.write(writer)?;
peer_state.latest_features.write(writer)?;
peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage));

if !peer_state.monitor_update_blocked_actions.is_empty() {
monitor_update_blocked_actions_per_peer
.get_or_insert_with(Vec::new)
Expand Down Expand Up @@ -13102,6 +13187,7 @@ where
(14, decode_update_add_htlcs_opt, option),
(15, self.inbound_payment_id_secret, required),
(17, in_flight_monitor_updates, required),
(19, peer_storage_dir, optional_vec),
});

Ok(())
Expand Down Expand Up @@ -13334,6 +13420,7 @@ where
monitor_update_blocked_actions: BTreeMap::new(),
actions_blocking_raa_monitor_updates: BTreeMap::new(),
closed_channel_monitor_update_ids: BTreeMap::new(),
peer_storage: Vec::new(),
is_connected: false,
}
};
Expand Down Expand Up @@ -13629,6 +13716,7 @@ where
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
let mut inbound_payment_id_secret = None;
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
read_tlv_fields!(reader, {
(1, pending_outbound_payments_no_retry, option),
(2, pending_intercepted_htlcs, option),
Expand All @@ -13645,8 +13733,10 @@ where
(14, decode_update_add_htlcs, option),
(15, inbound_payment_id_secret, option),
(17, in_flight_monitor_updates, required),
(19, peer_storage_dir, optional_vec),
});
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
if fake_scid_rand_bytes.is_none() {
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
}
Expand Down Expand Up @@ -13678,6 +13768,12 @@ where
}
let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());

for (peer_pubkey, peer_storage) in peer_storage_dir {
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
peer_state.get_mut().unwrap().peer_storage = peer_storage;
}
}

// Handle transitioning from the legacy TLV to the new one on upgrades.
if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
// We should never serialize an empty map.
Expand Down Expand Up @@ -14752,6 +14848,70 @@ mod tests {
}
}

#[test]
fn test_peer_storage() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

create_announced_chan_between_nodes(&nodes, 0, 1);

// Since we do not send peer storage, we manually simulate receiving a dummy
// `PeerStorage` from the channel partner.
nodes[0].node.handle_peer_storage(nodes[1].node.get_our_node_id(), msgs::PeerStorage{data: vec![0; 100]});

nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id());

nodes[0].node.peer_connected(nodes[1].node.get_our_node_id(), &msgs::Init {
features: nodes[1].node.init_features(), networks: None, remote_network_address: None
}, true).unwrap();
nodes[1].node.peer_connected(nodes[0].node.get_our_node_id(), &msgs::Init {
features: nodes[0].node.init_features(), networks: None, remote_network_address: None
}, false).unwrap();

let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(node_0_events.len(), 2);

for msg in node_0_events{
if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
nodes[1].node.handle_channel_reestablish(nodes[0].node.get_our_node_id(), msg);
assert_eq!(*node_id, nodes[1].node.get_our_node_id());
} else if let MessageSendEvent::SendPeerStorageRetrieval { ref node_id, ref msg } = msg {
nodes[1].node.handle_peer_storage_retrieval(nodes[0].node.get_our_node_id(), msg.clone());
assert_eq!(*node_id, nodes[1].node.get_our_node_id());
} else {
panic!("Unexpected event")
}
}

let msg_events_after_peer_storage_retrieval = nodes[1].node.get_and_clear_pending_msg_events();

// Check if we receive a warning message.
let peer_storage_warning: Vec<&MessageSendEvent> = msg_events_after_peer_storage_retrieval
.iter()
.filter(|event| match event {
MessageSendEvent::HandleError { .. } => true,
_ => false,
})
.collect();

assert_eq!(peer_storage_warning.len(), 1);

match peer_storage_warning[0] {
MessageSendEvent::HandleError { node_id, action } => {
assert_eq!(*node_id, nodes[0].node.get_our_node_id());
match action {
ErrorAction::SendWarningMessage { msg, .. } =>
assert_eq!(msg.data, "Invalid peer_storage_retrieval message received.".to_owned()),
_ => panic!("Unexpected error action"),
}
}
_ => panic!("Unexpected event"),
}
}

#[test]
fn test_keysend_dup_payment_hash() {
// (1): Test that a keysend payment with a duplicate payment hash to an existing pending
Expand Down
6 changes: 6 additions & 0 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,12 @@ macro_rules! get_htlc_update_msgs {
/// such messages are intended to all peers.
pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &mut Vec<MessageSendEvent>) -> MessageSendEvent {
let ev_index = msg_events.iter().position(|e| { match e {
MessageSendEvent::SendPeerStorage { node_id, .. } => {
node_id == msg_node_id
},
MessageSendEvent::SendPeerStorageRetrieval { node_id, .. } => {
node_id == msg_node_id
},
MessageSendEvent::SendAcceptChannel { node_id, .. } => {
node_id == msg_node_id
},
Expand Down
Loading
Loading