@@ -5678,6 +5678,64 @@ where
5678
5678
self . pending_outbound_payments . clear_pending_payments ( )
5679
5679
}
5680
5680
5681
+ /// (Partially) handles an [`EventCompletionAction`]. If `true` is returned the same `action`
5682
+ /// should be processed again.
5683
+ fn handle_post_event_action ( & self , action : & EventCompletionAction ) -> bool {
5684
+ match action {
5685
+ EventCompletionAction :: ReleaseRAAChannelMonitorUpdate {
5686
+ channel_funding_outpoint, counterparty_node_id
5687
+ } => {
5688
+ let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5689
+ if let Some ( peer_state_mtx) = per_peer_state. get ( & counterparty_node_id) {
5690
+ let mut peer_state_lck = peer_state_mtx. lock ( ) . unwrap ( ) ;
5691
+ let peer_state = & mut * peer_state_lck;
5692
+ if self . pending_events . lock ( ) . unwrap ( ) . iter ( )
5693
+ . any ( |( _ev, action_opt) | action_opt. as_ref ( ) == Some ( & action) )
5694
+ {
5695
+ // Check that, while holding the peer lock, we don't have another event
5696
+ // blocking any monitor updates for this channel. If we do, let those
5697
+ // events be the ones that ultimately release the monitor update(s).
5698
+ log_trace ! ( self . logger, "Delaying monitor unlock for channel {} as another event is pending" ,
5699
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5700
+ return false ;
5701
+ }
5702
+ if let hash_map:: Entry :: Occupied ( mut chan) = peer_state. channel_by_id . entry ( channel_funding_outpoint. to_channel_id ( ) ) {
5703
+ debug_assert_eq ! ( chan. get( ) . get_funding_txo( ) . unwrap( ) , * channel_funding_outpoint) ;
5704
+ if let Some ( ( monitor_update, further_update_exists) ) = chan. get_mut ( ) . fly_next_unflown_monitor_update ( ) {
5705
+ log_debug ! ( self . logger, "Unlocking monitor updating for channel {} and updating monitor" ,
5706
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5707
+ let update_res = self . chain_monitor . update_channel ( * channel_funding_outpoint, monitor_update) ;
5708
+ let update_id = monitor_update. update_id ;
5709
+ let _ = handle_error ! ( self ,
5710
+ handle_new_monitor_update!( self , update_res, update_id,
5711
+ peer_state_lck, peer_state, per_peer_state, chan) ,
5712
+ * counterparty_node_id) ;
5713
+ if further_update_exists {
5714
+ return true ;
5715
+ }
5716
+ } else {
5717
+ log_trace ! ( self . logger, "Unlocked monitor updating for channel {} without monitors to update" ,
5718
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5719
+ }
5720
+ }
5721
+ } else {
5722
+ log_debug ! ( self . logger,
5723
+ "Got a release post-RAA monitor update for peer {} but the channel is gone" ,
5724
+ log_pubkey!( * counterparty_node_id) ) ;
5725
+ }
5726
+ } ,
5727
+ }
5728
+ false
5729
+ }
5730
+
5731
+ fn handle_post_event_actions ( & self , actions : Vec < EventCompletionAction > ) {
5732
+ for action in actions {
5733
+ loop {
5734
+ if !self . handle_post_event_action ( & action) { break ; }
5735
+ }
5736
+ }
5737
+ }
5738
+
5681
5739
/// Processes any events asynchronously in the order they were generated since the last call
5682
5740
/// using the given event handler.
5683
5741
///
@@ -5698,22 +5756,32 @@ where
5698
5756
}
5699
5757
5700
5758
let _single_processor = self . pending_events_processor . lock ( ) . unwrap ( ) ;
5701
- let mut next_event = self . pending_events . lock ( ) . unwrap ( ) . front ( ) . map ( |ev| ( * ev) . clone ( ) ) ;
5702
5759
loop {
5703
- if let Some ( ( event, _action) ) = next_event {
5704
- result = NotifyOption :: DoPersist ;
5705
- let ev_clone;
5706
- #[ cfg( debug_assertions) ] {
5707
- ev_clone = event. clone ( ) ;
5760
+ let mut next_event = self . pending_events . lock ( ) . unwrap ( ) . front ( ) . map ( |ev| ( * ev) . clone ( ) ) ;
5761
+ let mut post_event_actions = Vec :: new ( ) ;
5762
+ loop {
5763
+ if let Some ( ( event, action_opt) ) = next_event {
5764
+ result = NotifyOption :: DoPersist ;
5765
+ let ev_clone: Event ;
5766
+ #[ cfg( debug_assertions) ] {
5767
+ ev_clone = event. clone ( ) ;
5768
+ }
5769
+ handler ( event) . await ;
5770
+ let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
5771
+ debug_assert_eq ! ( ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
5772
+ debug_assert_eq ! ( action_opt, pending_events. front( ) . unwrap( ) . 1 ) ;
5773
+ if let Some ( action) = action_opt {
5774
+ post_event_actions. push ( action) ;
5775
+ }
5776
+ pending_events. pop_front ( ) ;
5777
+ next_event = pending_events. front ( ) . map ( |ev| ev. clone ( ) ) ;
5778
+ } else {
5779
+ break ;
5708
5780
}
5709
- handler ( event) . await ;
5710
- let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
5711
- debug_assert_eq ! ( ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
5712
- pending_events. pop_front ( ) ;
5713
- next_event = pending_events. front ( ) . map ( |ev| ev. clone ( ) ) ;
5714
- } else {
5715
- break ;
5716
5781
}
5782
+ if post_event_actions. is_empty ( ) { break ; }
5783
+ self . handle_post_event_actions ( post_event_actions) ;
5784
+ // If we had some actions, go around again as we may have more events now
5717
5785
}
5718
5786
5719
5787
if result == NotifyOption :: DoPersist {
@@ -5810,22 +5878,32 @@ where
5810
5878
}
5811
5879
5812
5880
let _single_processor = self . pending_events_processor . lock ( ) . unwrap ( ) ;
5813
- let mut next_event = self . pending_events . lock ( ) . unwrap ( ) . front ( ) . map ( |ev| ( * ev) . clone ( ) ) ;
5814
5881
loop {
5815
- if let Some ( ( event, _action) ) = next_event {
5816
- result = NotifyOption :: DoPersist ;
5817
- let ev_clone;
5818
- #[ cfg( debug_assertions) ] {
5819
- ev_clone = event. clone ( ) ;
5882
+ let mut next_event = self . pending_events . lock ( ) . unwrap ( ) . front ( ) . map ( |ev| ( * ev) . clone ( ) ) ;
5883
+ let mut post_event_actions = Vec :: new ( ) ;
5884
+ loop {
5885
+ if let Some ( ( event, action_opt) ) = next_event {
5886
+ result = NotifyOption :: DoPersist ;
5887
+ let ev_clone: Event ;
5888
+ #[ cfg( debug_assertions) ] {
5889
+ ev_clone = event. clone ( ) ;
5890
+ }
5891
+ handler. handle_event ( event) ;
5892
+ let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
5893
+ debug_assert_eq ! ( ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
5894
+ debug_assert_eq ! ( action_opt, pending_events. front( ) . unwrap( ) . 1 ) ;
5895
+ if let Some ( action) = action_opt {
5896
+ post_event_actions. push ( action) ;
5897
+ }
5898
+ pending_events. pop_front ( ) ;
5899
+ next_event = pending_events. front ( ) . map ( |ev| ev. clone ( ) ) ;
5900
+ } else {
5901
+ break ;
5820
5902
}
5821
- handler. handle_event ( event) ;
5822
- let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
5823
- debug_assert_eq ! ( ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
5824
- pending_events. pop_front ( ) ;
5825
- next_event = pending_events. front ( ) . map ( |ev| ev. clone ( ) ) ;
5826
- } else {
5827
- break ;
5828
5903
}
5904
+ if post_event_actions. is_empty ( ) { break ; }
5905
+ self . handle_post_event_actions ( post_event_actions) ;
5906
+ // If we had some actions, go around again as we may have more events now
5829
5907
}
5830
5908
5831
5909
result
0 commit comments