@@ -166,22 +166,27 @@ enum OnionMessageBuffer {
166
166
/// Messages for a node connected as a peer.
167
167
ConnectedPeer ( VecDeque < OnionMessage > ) ,
168
168
169
- /// Messages for a node that is not yet connected.
170
- PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > ) ,
169
+ /// Messages for a node that is not yet connected, which are dropped after a certain number of
170
+ /// timer ticks defined in [`OnionMessenger::timer_tick_occurred`] and tracked here.
171
+ PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > , usize ) ,
171
172
}
172
173
173
174
impl OnionMessageBuffer {
175
+ fn pending_connection ( addresses : Vec < SocketAddress > ) -> Self {
176
+ Self :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) , 0 )
177
+ }
178
+
174
179
fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
175
180
match self {
176
181
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
177
- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
182
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
178
183
}
179
184
}
180
185
181
186
fn enqueue_message ( & mut self , message : OnionMessage ) {
182
187
let pending_messages = match self {
183
188
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
184
- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
189
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
185
190
} ;
186
191
187
192
pending_messages. push_back ( message) ;
@@ -190,7 +195,7 @@ impl OnionMessageBuffer {
190
195
fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
191
196
let pending_messages = match self {
192
197
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
193
- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => {
198
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => {
194
199
debug_assert ! ( false ) ;
195
200
pending_messages
196
201
} ,
@@ -203,14 +208,14 @@ impl OnionMessageBuffer {
203
208
fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
204
209
let pending_messages = match self {
205
210
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
206
- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
211
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
207
212
} ;
208
213
209
214
core:: mem:: take ( pending_messages)
210
215
}
211
216
212
217
fn mark_connected ( & mut self ) {
213
- if let OnionMessageBuffer :: PendingConnection ( pending_messages, _) = self {
218
+ if let OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) = self {
214
219
let mut new_pending_messages = VecDeque :: new ( ) ;
215
220
core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
216
221
* self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
@@ -710,9 +715,8 @@ where
710
715
hash_map:: Entry :: Vacant ( e) => match addresses {
711
716
None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
712
717
Some ( addresses) => {
713
- e. insert (
714
- OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) )
715
- ) . enqueue_message ( onion_message) ;
718
+ e. insert ( OnionMessageBuffer :: pending_connection ( addresses) )
719
+ . enqueue_message ( onion_message) ;
716
720
Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
717
721
} ,
718
722
} ,
@@ -795,7 +799,7 @@ where
795
799
{
796
800
fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
797
801
for ( node_id, recipient) in self . message_buffers . lock ( ) . unwrap ( ) . iter_mut ( ) {
798
- if let OnionMessageBuffer :: PendingConnection ( _, addresses) = recipient {
802
+ if let OnionMessageBuffer :: PendingConnection ( _, addresses, _ ) = recipient {
799
803
if let Some ( addresses) = addresses. take ( ) {
800
804
handler. handle_event ( Event :: ConnectionNeeded { node_id : * node_id, addresses } ) ;
801
805
}
@@ -896,6 +900,27 @@ where
896
900
}
897
901
}
898
902
903
+ fn timer_tick_occurred ( & self ) {
904
+ const MAX_TIMER_TICKS : usize = 2 ;
905
+ let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
906
+
907
+ // Drop any pending recipients since the last call to avoid retaining buffered messages for
908
+ // too long.
909
+ message_buffers. retain ( |_, recipient| match recipient {
910
+ OnionMessageBuffer :: PendingConnection ( _, None , ticks) => * ticks < MAX_TIMER_TICKS ,
911
+ OnionMessageBuffer :: PendingConnection ( _, Some ( _) , _) => true ,
912
+ _ => true ,
913
+ } ) ;
914
+
915
+ // Increment a timer tick for pending recipients so that their buffered messages are dropped
916
+ // at MAX_TIMER_TICKS.
917
+ for recipient in message_buffers. values_mut ( ) {
918
+ if let OnionMessageBuffer :: PendingConnection ( _, None , ticks) = recipient {
919
+ * ticks += 1 ;
920
+ }
921
+ }
922
+ }
923
+
899
924
fn provided_node_features ( & self ) -> NodeFeatures {
900
925
let mut features = NodeFeatures :: empty ( ) ;
901
926
features. set_onion_messages_optional ( ) ;
0 commit comments