@@ -1007,6 +1007,20 @@ where
1007
1007
}
1008
1008
msgs
1009
1009
}
1010
+
1011
+ fn enqueue_event ( & self , event : Event ) {
1012
+ const MAX_EVENTS_BUFFER_SIZE : usize = ( 1 << 10 ) * 256 ;
1013
+ let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
1014
+ let total_buffered_bytes: usize = pending_events
1015
+ . iter ( )
1016
+ . map ( |ev| ev. serialized_length ( ) )
1017
+ . sum ( ) ;
1018
+ if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
1019
+ log_trace ! ( self . logger, "Dropping event {:?}: buffer full" , event) ;
1020
+ return
1021
+ }
1022
+ pending_events. push ( event) ;
1023
+ }
1010
1024
}
1011
1025
1012
1026
fn outbound_buffer_full ( peer_node_id : & PublicKey , buffer : & HashMap < PublicKey , OnionMessageRecipient > ) -> bool {
@@ -1134,7 +1148,7 @@ where
1134
1148
log_trace ! ( logger, "Forwarding an onion message to peer {}" , next_node_id) ;
1135
1149
} ,
1136
1150
_ if self . intercept_messages_for_offline_peers => {
1137
- self . pending_events . lock ( ) . unwrap ( ) . push (
1151
+ self . enqueue_event (
1138
1152
Event :: OnionMessageIntercepted {
1139
1153
peer_node_id : next_node_id, message : onion_message
1140
1154
}
@@ -1162,7 +1176,7 @@ where
1162
1176
. or_insert_with ( || OnionMessageRecipient :: ConnectedPeer ( VecDeque :: new ( ) ) )
1163
1177
. mark_connected ( ) ;
1164
1178
if self . intercept_messages_for_offline_peers {
1165
- self . pending_events . lock ( ) . unwrap ( ) . push (
1179
+ self . enqueue_event (
1166
1180
Event :: OnionMessagePeerConnected { peer_node_id : * their_node_id }
1167
1181
) ;
1168
1182
}
0 commit comments