@@ -120,6 +120,8 @@ pub enum SendError {
120
120
/// The provided [`Destination`] was an invalid [`BlindedRoute`], due to having fewer than two
121
121
/// blinded hops.
122
122
TooFewBlindedHops ,
123
+ /// Our next-hop peer was offline or does not support onion message forwarding.
124
+ InvalidFirstHop ,
123
125
}
124
126
125
127
impl < Signer : Sign , K : Deref , L : Deref > OnionMessenger < Signer , K , L >
@@ -163,25 +165,29 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
163
165
. map_err ( |e| SendError :: Secp256k1 ( e) ) ?;
164
166
165
167
let prng_seed = self . keys_manager . get_secure_random_bytes ( ) ;
166
- let onion_packet = construct_onion_message_packet (
168
+ let onion_routing_packet = construct_onion_message_packet (
167
169
packet_payloads, packet_keys, prng_seed) . map_err ( |( ) | SendError :: TooBigPacket ) ?;
168
170
169
171
let mut pending_per_peer_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
170
- let pending_msgs = pending_per_peer_msgs. entry ( introduction_node_id) . or_insert_with ( VecDeque :: new ) ;
171
- pending_msgs . push_back (
172
- msgs :: OnionMessage {
173
- blinding_point,
174
- onion_routing_packet : onion_packet ,
172
+ match pending_per_peer_msgs. entry ( introduction_node_id) {
173
+ hash_map :: Entry :: Vacant ( _ ) => Err ( SendError :: InvalidFirstHop ) ,
174
+ hash_map :: Entry :: Occupied ( mut e ) => {
175
+ e . get_mut ( ) . push_back ( msgs :: OnionMessage { blinding_point, onion_routing_packet } ) ;
176
+ Ok ( ( ) )
175
177
}
176
- ) ;
177
- Ok ( ( ) )
178
+ }
178
179
}
179
180
180
181
#[ cfg( test) ]
181
182
pub ( super ) fn release_pending_msgs ( & self ) -> HashMap < PublicKey , VecDeque < msgs:: OnionMessage > > {
182
183
let mut pending_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
183
184
let mut msgs = HashMap :: new ( ) ;
184
- core:: mem:: swap ( & mut * pending_msgs, & mut msgs) ;
185
+ // We don't want to disconnect the peers by removing them entirely from the original map, so we
186
+ // swap the pending message buffers individually.
187
+ for ( peer_node_id, pending_messages) in & mut * pending_msgs {
188
+ msgs. insert ( * peer_node_id, VecDeque :: new ( ) ) ;
189
+ core:: mem:: swap ( pending_messages, msgs. get_mut ( & peer_node_id) . unwrap ( ) ) ;
190
+ }
185
191
msgs
186
192
}
187
193
}
@@ -250,32 +256,43 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
250
256
} ;
251
257
252
258
let mut pending_per_peer_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
253
- let pending_msgs = pending_per_peer_msgs. entry ( next_node_id) . or_insert_with ( VecDeque :: new) ;
254
- pending_msgs. push_back (
255
- msgs:: OnionMessage {
256
- blinding_point : match next_blinding_override {
257
- Some ( blinding_point) => blinding_point,
258
- None => {
259
- let blinding_factor = {
260
- let mut sha = Sha256 :: engine ( ) ;
261
- sha. input ( & msg. blinding_point . serialize ( ) [ ..] ) ;
262
- sha. input ( control_tlvs_ss. as_ref ( ) ) ;
263
- Sha256 :: from_engine ( sha) . into_inner ( )
264
- } ;
265
- let next_blinding_point = msg. blinding_point ;
266
- match next_blinding_point. mul_tweak ( & self . secp_ctx , & Scalar :: from_be_bytes ( blinding_factor) . unwrap ( ) ) {
267
- Ok ( bp) => bp,
268
- Err ( e) => {
269
- log_trace ! ( self . logger, "Failed to compute next blinding point: {}" , e) ;
270
- return
271
- }
272
- }
273
- } ,
274
- } ,
275
- onion_routing_packet : outgoing_packet,
259
+
260
+ #[ cfg( fuzzing) ]
261
+ pending_per_peer_msgs. entry ( next_node_id) . or_insert_with ( || VecDeque :: new ( ) ) ;
262
+
263
+ match pending_per_peer_msgs. entry ( next_node_id) {
264
+ hash_map:: Entry :: Vacant ( _) => {
265
+ log_trace ! ( self . logger, "Dropping forwarded onion message to disconnected peer {:?}" , next_node_id) ;
266
+ return
276
267
} ,
277
- ) ;
278
- log_trace ! ( self . logger, "Forwarding an onion message to peer {}" , next_node_id) ;
268
+ hash_map:: Entry :: Occupied ( mut e) => {
269
+ e. get_mut ( ) . push_back (
270
+ msgs:: OnionMessage {
271
+ blinding_point : match next_blinding_override {
272
+ Some ( blinding_point) => blinding_point,
273
+ None => {
274
+ let blinding_factor = {
275
+ let mut sha = Sha256 :: engine ( ) ;
276
+ sha. input ( & msg. blinding_point . serialize ( ) [ ..] ) ;
277
+ sha. input ( control_tlvs_ss. as_ref ( ) ) ;
278
+ Sha256 :: from_engine ( sha) . into_inner ( )
279
+ } ;
280
+ let next_blinding_point = msg. blinding_point ;
281
+ match next_blinding_point. mul_tweak ( & self . secp_ctx , & Scalar :: from_be_bytes ( blinding_factor) . unwrap ( ) ) {
282
+ Ok ( bp) => bp,
283
+ Err ( e) => {
284
+ log_trace ! ( self . logger, "Failed to compute next blinding point: {}" , e) ;
285
+ return
286
+ }
287
+ }
288
+ } ,
289
+ } ,
290
+ onion_routing_packet : outgoing_packet,
291
+ } ,
292
+ ) ;
293
+ log_trace ! ( self . logger, "Forwarding an onion message to peer {}" , next_node_id) ;
294
+ }
295
+ } ;
279
296
} ,
280
297
Err ( e) => {
281
298
log_trace ! ( self . logger, "Errored decoding onion message packet: {:?}" , e) ;
@@ -285,6 +302,18 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
285
302
} ,
286
303
} ;
287
304
}
305
+
306
+ fn peer_connected ( & self , their_node_id : & PublicKey , init : & msgs:: Init ) {
307
+ if init. features . supports_onion_messages ( ) {
308
+ let mut peers = self . pending_messages . lock ( ) . unwrap ( ) ;
309
+ peers. insert ( their_node_id. clone ( ) , VecDeque :: new ( ) ) ;
310
+ }
311
+ }
312
+
313
+ fn peer_disconnected ( & self , their_node_id : & PublicKey , _no_connection_possible : bool ) {
314
+ let mut pending_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
315
+ pending_msgs. remove ( their_node_id) ;
316
+ }
288
317
}
289
318
290
319
impl < Signer : Sign , K : Deref , L : Deref > OnionMessageProvider for OnionMessenger < Signer , K , L >
0 commit comments