@@ -1168,9 +1168,23 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
1168
1168
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
1169
1169
/// [`ChannelMessageHandler::peer_disconnected`].
1170
1170
pub is_connected: bool,
1171
+ peer_storage: Vec<u8>,
1171
1172
}
1172
1173
1173
1174
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
1175
+ pub fn new(features: &InitFeatures) -> Self {
1176
+ Self {
1177
+ channel_by_id: new_hash_map(),
1178
+ inbound_channel_request_by_id: new_hash_map(),
1179
+ latest_features: features.clone(),
1180
+ pending_msg_events: Vec::new(),
1181
+ in_flight_monitor_updates: BTreeMap::new(),
1182
+ monitor_update_blocked_actions: BTreeMap::new(),
1183
+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
1184
+ is_connected: true,
1185
+ peer_storage: Vec::new(),
1186
+ }
1187
+ }
1174
1188
/// Indicates that a peer meets the criteria where we're ok to remove it from our storage.
1175
1189
/// If true is passed for `require_disconnected`, the function will return false if we haven't
1176
1190
/// disconnected from the node already, ie. `PeerState::is_connected` is set to `true`.
@@ -2431,7 +2445,7 @@ where
2431
2445
entropy_source: ES,
2432
2446
node_signer: NS,
2433
2447
signer_provider: SP,
2434
-
2448
+ our_peer_storage: FairRwLock<OurPeerStorage>,
2435
2449
logger: L,
2436
2450
}
2437
2451
@@ -3249,7 +3263,7 @@ where
3249
3263
entropy_source,
3250
3264
node_signer,
3251
3265
signer_provider,
3252
-
3266
+ our_peer_storage: FairRwLock::new(OurPeerStorage::new()),
3253
3267
logger,
3254
3268
}
3255
3269
}
@@ -7000,6 +7014,7 @@ where
7000
7014
monitor_update_blocked_actions: BTreeMap::new(),
7001
7015
actions_blocking_raa_monitor_updates: BTreeMap::new(),
7002
7016
is_connected: false,
7017
+ peer_storage: Vec::new(),
7003
7018
}));
7004
7019
let mut peer_state = peer_state_mutex.lock().unwrap();
7005
7020
@@ -7860,6 +7875,42 @@ where
7860
7875
}
7861
7876
}
7862
7877
7878
+ fn internal_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) {
7879
+ let per_peer_state = self.per_peer_state.read().unwrap();
7880
+ let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
7881
+ Some(peer_state_mutex) => peer_state_mutex,
7882
+ None => return,
7883
+ };
7884
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
7885
+ let peer_state = &mut *peer_state_lock;
7886
+ let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None);
7887
+
7888
+ // Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with).
7889
+ if !peer_state.channel_by_id.values().any(|phase| matches!(phase, ChannelPhase::Funded(_))) {
7890
+ log_debug!(logger, "We do not have any channel with {}", log_pubkey!(counterparty_node_id));
7891
+ return;
7892
+ }
7893
+
7894
+ #[cfg(not(test))]
7895
+ if msg.data.len() > 1024 {
7896
+ log_debug!(logger, "We do not allow more than 1 KiB of data for each peer in peer storage. Sending warning to peer {}", log_pubkey!(counterparty_node_id));
7897
+ peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
7898
+ node_id: counterparty_node_id.clone(),
7899
+ action: msgs::ErrorAction::SendWarningMessage {
7900
+ msg: msgs::WarningMessage {
7901
+ channel_id: ChannelId([0; 32]),
7902
+ data: "Supports only data up to 1 KiB in peer storage.".to_owned()
7903
+ },
7904
+ log_level: Level::Trace,
7905
+ }
7906
+ });
7907
+ return;
7908
+ }
7909
+
7910
+ log_trace!(logger, "Received Peer Storage from {}", log_pubkey!(counterparty_node_id));
7911
+ peer_state.peer_storage = msg.data.clone();
7912
+ }
7913
+
7863
7914
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
7864
7915
let best_block = *self.best_block.read().unwrap();
7865
7916
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -10471,6 +10522,8 @@ where
10471
10522
}
10472
10523
10473
10524
fn handle_peer_storage(&self, counterparty_node_id: PublicKey, msg: &msgs::PeerStorageMessage) {
10525
+ let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents);
10526
+ self.internal_peer_storage(&counterparty_node_id, msg);
10474
10527
}
10475
10528
10476
10529
fn handle_your_peer_storage(&self, counterparty_node_id: PublicKey, msg: &msgs::YourPeerStorageMessage) {
@@ -10796,6 +10849,7 @@ where
10796
10849
monitor_update_blocked_actions: BTreeMap::new(),
10797
10850
actions_blocking_raa_monitor_updates: BTreeMap::new(),
10798
10851
is_connected: true,
10852
+ peer_storage: Vec::new(),
10799
10853
}));
10800
10854
},
10801
10855
hash_map::Entry::Occupied(e) => {
@@ -10825,6 +10879,16 @@ where
10825
10879
let peer_state = &mut *peer_state_lock;
10826
10880
let pending_msg_events = &mut peer_state.pending_msg_events;
10827
10881
10882
+ if !peer_state.peer_storage.is_empty() {
10883
+ pending_msg_events.push(events::MessageSendEvent::SendYourPeerStorageMessage {
10884
+ node_id: counterparty_node_id.clone(),
10885
+ msg: msgs::YourPeerStorageMessage {
10886
+ data: peer_state.peer_storage.clone()
10887
+ },
10888
+ });
10889
+ }
10890
+
10891
+
10828
10892
for (_, phase) in peer_state.channel_by_id.iter_mut() {
10829
10893
match phase {
10830
10894
ChannelPhase::Funded(chan) => {
@@ -11910,6 +11974,12 @@ where
11910
11974
if !peer_state.ok_to_remove(false) {
11911
11975
peer_pubkey.write(writer)?;
11912
11976
peer_state.latest_features.write(writer)?;
11977
+
11978
+ (peer_state.peer_storage.len() as u64).write(writer)?;
11979
+ for p in peer_state.peer_storage.iter() {
11980
+ p.write(writer)?;
11981
+ }
11982
+
11913
11983
if !peer_state.monitor_update_blocked_actions.is_empty() {
11914
11984
monitor_update_blocked_actions_per_peer
11915
11985
.get_or_insert_with(Vec::new)
@@ -12423,6 +12493,7 @@ where
12423
12493
monitor_update_blocked_actions: BTreeMap::new(),
12424
12494
actions_blocking_raa_monitor_updates: BTreeMap::new(),
12425
12495
is_connected: false,
12496
+ peer_storage: Vec::new(),
12426
12497
}
12427
12498
};
12428
12499
@@ -12433,6 +12504,15 @@ where
12433
12504
let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(new_hash_map());
12434
12505
let mut peer_state = peer_state_from_chans(peer_chans);
12435
12506
peer_state.latest_features = Readable::read(reader)?;
12507
+
12508
+ let peer_storage_count:u64 = Readable::read(reader)?;
12509
+ let mut peer_storage: Vec<u8> = Vec::with_capacity(cmp::min(peer_storage_count as usize, MAX_ALLOC_SIZE/mem::size_of::<u8>()));
12510
+ for i in 0..peer_storage_count {
12511
+ let x = Readable::read(reader)?;
12512
+ peer_storage.insert(i as usize, x);
12513
+ }
12514
+ peer_state.peer_storage = peer_storage;
12515
+
12436
12516
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
12437
12517
}
12438
12518
@@ -13087,6 +13167,7 @@ where
13087
13167
13088
13168
last_days_feerates: Mutex::new(VecDeque::new()),
13089
13169
13170
+ our_peer_storage: FairRwLock::new(our_peer_storage),
13090
13171
logger: args.logger,
13091
13172
default_configuration: args.default_config,
13092
13173
};
0 commit comments