@@ -18,6 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
18
18
use crate :: blinded_path:: BlindedPath ;
19
19
use crate :: blinded_path:: message:: { advance_path_by_one, ForwardTlvs , ReceiveTlvs } ;
20
20
use crate :: blinded_path:: utils;
21
+ use crate :: events:: { Event , EventHandler , EventsProvider } ;
21
22
use crate :: sign:: { EntropySource , KeysManager , NodeSigner , Recipient } ;
22
23
#[ cfg( not( c_bindings) ) ]
23
24
use crate :: ln:: channelmanager:: { SimpleArcChannelManager , SimpleRefChannelManager } ;
@@ -166,21 +167,21 @@ enum OnionMessageBuffer {
166
167
ConnectedPeer ( VecDeque < OnionMessage > ) ,
167
168
168
169
/// Messages for a node that is not yet connected.
169
- PendingConnection ( VecDeque < OnionMessage > ) ,
170
+ PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > ) ,
170
171
}
171
172
172
173
impl OnionMessageBuffer {
173
174
fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
174
175
match self {
175
176
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
176
- OnionMessageBuffer :: PendingConnection ( pending_messages) => pending_messages,
177
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) => pending_messages,
177
178
}
178
179
}
179
180
180
181
fn enqueue_message ( & mut self , message : OnionMessage ) {
181
182
let pending_messages = match self {
182
183
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
183
- OnionMessageBuffer :: PendingConnection ( pending_messages) => pending_messages,
184
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) => pending_messages,
184
185
} ;
185
186
186
187
pending_messages. push_back ( message) ;
@@ -189,7 +190,7 @@ impl OnionMessageBuffer {
189
190
fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
190
191
let pending_messages = match self {
191
192
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
192
- OnionMessageBuffer :: PendingConnection ( pending_messages) => {
193
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) => {
193
194
debug_assert ! ( false ) ;
194
195
pending_messages
195
196
} ,
@@ -202,14 +203,14 @@ impl OnionMessageBuffer {
202
203
fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
203
204
let pending_messages = match self {
204
205
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
205
- OnionMessageBuffer :: PendingConnection ( pending_messages) => pending_messages,
206
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) => pending_messages,
206
207
} ;
207
208
208
209
core:: mem:: take ( pending_messages)
209
210
}
210
211
211
212
fn mark_connected ( & mut self ) {
212
- if let OnionMessageBuffer :: PendingConnection ( pending_messages) = self {
213
+ if let OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) = self {
213
214
let mut new_pending_messages = VecDeque :: new ( ) ;
214
215
core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
215
216
* self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
@@ -381,6 +382,8 @@ pub enum SendError {
381
382
/// The provided [`Destination`] was an invalid [`BlindedPath`] due to not having any blinded
382
383
/// hops.
383
384
TooFewBlindedHops ,
385
+ /// The first hop is not a peer and doesn't have a known [`SocketAddress`].
386
+ InvalidFirstHop ( PublicKey ) ,
384
387
/// A path from the sender to the destination could not be found by the [`MessageRouter`].
385
388
PathNotFound ,
386
389
/// Onion message contents must have a TLV type >= 64.
@@ -453,12 +456,12 @@ pub enum PeeledOnion<T: OnionMessageContents> {
453
456
pub fn create_onion_message < ES : Deref , NS : Deref , T : OnionMessageContents > (
454
457
entropy_source : & ES , node_signer : & NS , secp_ctx : & Secp256k1 < secp256k1:: All > ,
455
458
path : OnionMessagePath , contents : T , reply_path : Option < BlindedPath > ,
456
- ) -> Result < ( PublicKey , OnionMessage ) , SendError >
459
+ ) -> Result < ( PublicKey , OnionMessage , Option < Vec < SocketAddress > > ) , SendError >
457
460
where
458
461
ES :: Target : EntropySource ,
459
462
NS :: Target : NodeSigner ,
460
463
{
461
- let OnionMessagePath { intermediate_nodes, mut destination, .. } = path;
464
+ let OnionMessagePath { intermediate_nodes, mut destination, addresses } = path;
462
465
if let Destination :: BlindedPath ( BlindedPath { ref blinded_hops, .. } ) = destination {
463
466
if blinded_hops. is_empty ( ) {
464
467
return Err ( SendError :: TooFewBlindedHops ) ;
@@ -499,10 +502,8 @@ where
499
502
let onion_routing_packet = construct_onion_message_packet (
500
503
packet_payloads, packet_keys, prng_seed) . map_err ( |( ) | SendError :: TooBigPacket ) ?;
501
504
502
- Ok ( ( first_node_id, OnionMessage {
503
- blinding_point,
504
- onion_routing_packet
505
- } ) )
505
+ let message = OnionMessage { blinding_point, onion_routing_packet } ;
506
+ Ok ( ( first_node_id, message, addresses) )
506
507
}
507
508
508
509
/// Decode one layer of an incoming [`OnionMessage`].
@@ -696,7 +697,7 @@ where
696
697
) -> Result < SendSuccess , SendError > {
697
698
log_trace ! ( self . logger, "Constructing onion message {}: {:?}" , log_suffix, contents) ;
698
699
699
- let ( first_node_id, onion_message) = create_onion_message (
700
+ let ( first_node_id, onion_message, addresses ) = create_onion_message (
700
701
& self . entropy_source , & self . node_signer , & self . secp_ctx , path, contents, reply_path
701
702
) ?;
702
703
@@ -706,10 +707,14 @@ where
706
707
}
707
708
708
709
match message_buffers. entry ( first_node_id) {
709
- hash_map:: Entry :: Vacant ( e) => {
710
- e. insert ( OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) ) )
711
- . enqueue_message ( onion_message) ;
712
- Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
710
+ hash_map:: Entry :: Vacant ( e) => match addresses {
711
+ None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
712
+ Some ( addresses) => {
713
+ e. insert (
714
+ OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) )
715
+ ) . enqueue_message ( onion_message) ;
716
+ Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
717
+ } ,
713
718
} ,
714
719
hash_map:: Entry :: Occupied ( mut e) => {
715
720
e. get_mut ( ) . enqueue_message ( onion_message) ;
@@ -778,6 +783,27 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, On
778
783
false
779
784
}
780
785
786
+ impl < ES : Deref , NS : Deref , L : Deref , MR : Deref , OMH : Deref , CMH : Deref > EventsProvider
787
+ for OnionMessenger < ES , NS , L , MR , OMH , CMH >
788
+ where
789
+ ES :: Target : EntropySource ,
790
+ NS :: Target : NodeSigner ,
791
+ L :: Target : Logger ,
792
+ MR :: Target : MessageRouter ,
793
+ OMH :: Target : OffersMessageHandler ,
794
+ CMH :: Target : CustomOnionMessageHandler ,
795
+ {
796
+ fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
797
+ for ( node_id, recipient) in self . message_buffers . lock ( ) . unwrap ( ) . iter_mut ( ) {
798
+ if let OnionMessageBuffer :: PendingConnection ( _, addresses) = recipient {
799
+ if let Some ( addresses) = addresses. take ( ) {
800
+ handler. handle_event ( Event :: ConnectionNeeded { node_id : * node_id, addresses } ) ;
801
+ }
802
+ }
803
+ }
804
+ }
805
+ }
806
+
781
807
impl < ES : Deref , NS : Deref , L : Deref , MR : Deref , OMH : Deref , CMH : Deref > OnionMessageHandler
782
808
for OnionMessenger < ES , NS , L , MR , OMH , CMH >
783
809
where
0 commit comments