@@ -2122,7 +2122,7 @@ where
2122
2122
entropy_source: ES,
2123
2123
node_signer: NS,
2124
2124
signer_provider: SP,
2125
-
2125
+ peer_storage: Mutex<HashMap<PublicKey, Vec<u8>>>,
2126
2126
logger: L,
2127
2127
}
2128
2128
@@ -2901,7 +2901,7 @@ where
2901
2901
entropy_source,
2902
2902
node_signer,
2903
2903
signer_provider,
2904
-
2904
+ peer_storage: Mutex::new(new_hash_map()),
2905
2905
logger,
2906
2906
}
2907
2907
}
@@ -7172,6 +7172,25 @@ where
7172
7172
}
7173
7173
}
7174
7174
7175
+ fn internal_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) {
7176
+ let per_peer_state = self.per_peer_state.write().unwrap();
7177
+ let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
7178
+ Some(peer_state_mutex) => peer_state_mutex,
7179
+ None => return,
7180
+ };
7181
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
7182
+ let peer_state = &mut *peer_state_lock;
7183
+ let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None);
7184
+
7185
+ // Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with).
7186
+ if peer_state.total_channel_count() == 0 {
7187
+ log_debug!(logger, "We do not have any channel with {}", log_pubkey!(counterparty_node_id));
7188
+ return;
7189
+ }
7190
+ log_trace!(logger, "Received Peer Storage from {}", log_pubkey!(counterparty_node_id));
7191
+ self.peer_storage.lock().unwrap().insert(*counterparty_node_id, msg.data.clone());
7192
+ }
7193
+
7175
7194
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
7176
7195
let best_block = *self.best_block.read().unwrap();
7177
7196
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -9672,6 +9691,8 @@ where
9672
9691
}
9673
9692
9674
9693
fn handle_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) {
9694
+ let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
9695
+ self.internal_peer_storage(counterparty_node_id, msg);
9675
9696
}
9676
9697
9677
9698
fn handle_your_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::YourPeerStorageMessage) {
@@ -10025,6 +10046,17 @@ where
10025
10046
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
10026
10047
let peer_state = &mut *peer_state_lock;
10027
10048
let pending_msg_events = &mut peer_state.pending_msg_events;
10049
+ let peer_storage = self.peer_storage.lock().unwrap().get(counterparty_node_id).unwrap_or(&Vec::<u8>::new()).clone();
10050
+
10051
+ if peer_storage.len() > 0 {
10052
+ pending_msg_events.push(events::MessageSendEvent::SendYourPeerStorageMessage {
10053
+ node_id: counterparty_node_id.clone(),
10054
+ msg: msgs::YourPeerStorageMessage {
10055
+ data: peer_storage
10056
+ },
10057
+ });
10058
+ }
10059
+
10028
10060
10029
10061
for (_, phase) in peer_state.channel_by_id.iter_mut() {
10030
10062
match phase {
@@ -11034,6 +11066,13 @@ where
11034
11066
pending_payment.write(writer)?;
11035
11067
}
11036
11068
11069
+ let peer_storage = self.peer_storage.lock().unwrap();
11070
+ (peer_storage.len() as u64).write(writer)?;
11071
+ for (node_id, peer_data) in peer_storage.iter() {
11072
+ node_id.write(writer)?;
11073
+ peer_data.write(writer)?;
11074
+ }
11075
+
11037
11076
// For backwards compat, write the session privs and their total length.
11038
11077
let mut num_pending_outbounds_compat: u64 = 0;
11039
11078
for (_, outbound) in pending_outbound_payments.iter() {
@@ -11539,6 +11578,14 @@ where
11539
11578
}
11540
11579
}
11541
11580
11581
+ let peer_storage_count: u64 = Readable::read(reader)?;
11582
+ let mut peer_storage: HashMap<PublicKey, Vec<u8>> = hash_map_with_capacity(cmp::min(peer_storage_count as usize, MAX_ALLOC_SIZE/(3*32)));
11583
+ for _ in 0..peer_storage_count {
11584
+ if peer_storage.insert(Readable::read(reader)?, Readable::read(reader)?).is_some() {
11585
+ return Err(DecodeError::InvalidValue);
11586
+ }
11587
+ }
11588
+
11542
11589
let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
11543
11590
let mut pending_outbound_payments_compat: HashMap<PaymentId, PendingOutboundPayment> =
11544
11591
hash_map_with_capacity(cmp::min(pending_outbound_payments_count_compat as usize, MAX_ALLOC_SIZE/32));
@@ -12168,6 +12215,7 @@ where
12168
12215
12169
12216
last_days_feerates: Mutex::new(VecDeque::new()),
12170
12217
12218
+ peer_storage: Mutex::new(peer_storage),
12171
12219
logger: args.logger,
12172
12220
default_configuration: args.default_config,
12173
12221
};
0 commit comments