@@ -1600,7 +1600,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1600
1600
}
1601
1601
1602
1602
for msg in msgs_to_forward. drain ( ..) {
1603
- self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( |( pk, _) | pk) ) ;
1603
+ self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( |( pk, _) | pk) , false ) ;
1604
1604
}
1605
1605
1606
1606
Ok ( pause_read)
@@ -1946,7 +1946,17 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1946
1946
Ok ( should_forward)
1947
1947
}
1948
1948
1949
- fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > ) {
1949
+ /// Forwards a gossip `msg` to `peers` excluding node(s) that generated the gossip message and
1950
+ /// excluding `except_node`.
1951
+ ///
1952
+ /// If the message queue for a peer is somewhat full, the message will not be forwarded to them
1953
+ /// unless `allow_large_buffer` is set, in which case the message will be treated as critical
1954
+ /// and delivered no matter the available buffer space.
1955
+ fn forward_broadcast_msg (
1956
+ & self , peers : & HashMap < Descriptor , Mutex < Peer > > ,
1957
+ msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > ,
1958
+ except_node : Option < & PublicKey > , allow_large_buffer : bool ,
1959
+ ) {
1950
1960
match msg {
1951
1961
wire:: Message :: ChannelAnnouncement ( ref msg) => {
1952
1962
log_gossip ! ( self . logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}" , except_node, msg) ;
@@ -1961,7 +1971,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1961
1971
debug_assert ! ( peer. their_node_id. is_some( ) ) ;
1962
1972
debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
1963
1973
let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
1964
- if peer. buffer_full_drop_gossip_broadcast ( ) {
1974
+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
1965
1975
log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1966
1976
continue ;
1967
1977
}
@@ -1989,7 +1999,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1989
1999
debug_assert ! ( peer. their_node_id. is_some( ) ) ;
1990
2000
debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
1991
2001
let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
1992
- if peer. buffer_full_drop_gossip_broadcast ( ) {
2002
+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
1993
2003
log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1994
2004
continue ;
1995
2005
}
@@ -2017,7 +2027,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2017
2027
debug_assert ! ( peer. their_node_id. is_some( ) ) ;
2018
2028
debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
2019
2029
let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
2020
- if peer. buffer_full_drop_gossip_broadcast ( ) {
2030
+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
2021
2031
log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
2022
2032
continue ;
2023
2033
}
@@ -2099,6 +2109,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2099
2109
}
2100
2110
}
2101
2111
}
2112
+
2113
+ // Handles a `MessageSendEvent`, using `from_chan_handler` to decide if we should
2114
+ // robustly gossip broadcast events even if a peer's message buffer is full.
2102
2115
let mut handle_event = |event, from_chan_handler| {
2103
2116
match event {
2104
2117
MessageSendEvent :: SendAcceptChannel { ref node_id, ref msg } => {
@@ -2293,31 +2306,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2293
2306
MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg } => {
2294
2307
log_debug ! ( self . logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
2295
2308
match self . message_handler . route_handler . handle_channel_announcement ( None , & msg) {
2296
- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2297
- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelAnnouncement ( msg) , None ) ,
2309
+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2310
+ let forward = wire:: Message :: ChannelAnnouncement ( msg) ;
2311
+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2312
+ } ,
2298
2313
_ => { } ,
2299
2314
}
2300
2315
if let Some ( msg) = update_msg {
2301
2316
match self . message_handler . route_handler . handle_channel_update ( None , & msg) {
2302
- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2303
- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelUpdate ( msg) , None ) ,
2317
+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2318
+ let forward = wire:: Message :: ChannelUpdate ( msg) ;
2319
+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2320
+ } ,
2304
2321
_ => { } ,
2305
2322
}
2306
2323
}
2307
2324
} ,
2308
2325
MessageSendEvent :: BroadcastChannelUpdate { msg } => {
2309
2326
log_debug ! ( self . logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}" , msg. contents) ;
2310
2327
match self . message_handler . route_handler . handle_channel_update ( None , & msg) {
2311
- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2312
- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelUpdate ( msg) , None ) ,
2328
+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2329
+ let forward = wire:: Message :: ChannelUpdate ( msg) ;
2330
+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2331
+ } ,
2313
2332
_ => { } ,
2314
2333
}
2315
2334
} ,
2316
2335
MessageSendEvent :: BroadcastNodeAnnouncement { msg } => {
2317
2336
log_debug ! ( self . logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}" , msg. contents. node_id) ;
2318
2337
match self . message_handler . route_handler . handle_node_announcement ( None , & msg) {
2319
- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2320
- self . forward_broadcast_msg ( peers, & wire:: Message :: NodeAnnouncement ( msg) , None ) ,
2338
+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2339
+ let forward = wire:: Message :: NodeAnnouncement ( msg) ;
2340
+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2341
+ } ,
2321
2342
_ => { } ,
2322
2343
}
2323
2344
} ,
@@ -2689,7 +2710,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2689
2710
2690
2711
log_debug ! ( self . logger, "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler." ) ;
2691
2712
let _ = self . message_handler . route_handler . handle_node_announcement ( None , & msg) ;
2692
- self . forward_broadcast_msg ( & * self . peers . read ( ) . unwrap ( ) , & wire:: Message :: NodeAnnouncement ( msg) , None ) ;
2713
+ self . forward_broadcast_msg ( & * self . peers . read ( ) . unwrap ( ) , & wire:: Message :: NodeAnnouncement ( msg) , None , true ) ;
2693
2714
}
2694
2715
}
2695
2716
0 commit comments