@@ -479,6 +479,16 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
479
479
/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
480
480
pub ( crate ) const EXPIRE_PREV_CONFIG_TICKS : usize = 5 ;
481
481
482
+ struct PendingChannelMonitorUpdate {
483
+ update : ChannelMonitorUpdate ,
484
+ /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
485
+ /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
486
+ /// blocked on some externl event and the [`ChannelManager`] will update us when we're ready.
487
+ ///
488
+ /// [`ChannelManager`]: super::channelmanager::ChannelManager
489
+ blocked : bool ,
490
+ }
491
+
482
492
// TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking
483
493
// has been completed, and then turn into a Channel to get compiler-time enforcement of things like
484
494
// calling channel_id() before we're set up or things like get_outbound_funding_signed on an
@@ -744,7 +754,7 @@ pub(super) struct Channel<Signer: ChannelSigner> {
744
754
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
745
755
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
746
756
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
747
- pending_monitor_updates : Vec < ChannelMonitorUpdate > ,
757
+ pending_monitor_updates : Vec < PendingChannelMonitorUpdate > ,
748
758
}
749
759
750
760
#[ cfg( any( test, fuzzing) ) ]
@@ -1977,28 +1987,52 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
1977
1987
}
1978
1988
1979
1989
pub fn get_update_fulfill_htlc_and_commit < L : Deref > ( & mut self , htlc_id : u64 , payment_preimage : PaymentPreimage , logger : & L ) -> UpdateFulfillCommitFetch where L :: Target : Logger {
1990
+ let release_cs_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
1980
1991
match self . get_update_fulfill_htlc ( htlc_id, payment_preimage, logger) {
1981
- UpdateFulfillFetch :: NewClaim { mut monitor_update, htlc_value_msat, msg : Some ( _) } => {
1982
- let mut additional_update = self . build_commitment_no_status_check ( logger) ;
1983
- // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
1984
- // strictly increasing by one, so decrement it here.
1985
- self . latest_monitor_update_id = monitor_update. update_id ;
1986
- monitor_update. updates . append ( & mut additional_update. updates ) ;
1987
- self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1988
- self . pending_monitor_updates . push ( monitor_update) ;
1992
+ UpdateFulfillFetch :: NewClaim { mut monitor_update, htlc_value_msat, msg } => {
1993
+ // Even if we aren't supposed to let new monitor updates with commitment state
1994
+ // updates run, we still need to push the preimage ChannelMonitorUpdateStep no
1995
+ // matter what. Sadly, to push a new monitor update which flies before others
1996
+ // already queued, we have to insert it into the pending queue and update the
1997
+ // update_ids of all the following monitors.
1998
+ let unblocked_monitor_pos = if release_cs_monitor && msg. is_some ( ) {
1999
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
2000
+ // strictly increasing by one, so decrement it here.
2001
+ let mut additional_update = self . build_commitment_no_status_check ( logger) ;
2002
+ self . latest_monitor_update_id = monitor_update. update_id ;
2003
+ monitor_update. updates . append ( & mut additional_update. updates ) ;
2004
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
2005
+ update : monitor_update, blocked : false ,
2006
+ } ) ;
2007
+ self . pending_monitor_updates . len ( ) - 1
2008
+ } else {
2009
+ let insert_pos = self . pending_monitor_updates . iter ( ) . position ( |upd| upd. blocked )
2010
+ . unwrap_or ( self . pending_monitor_updates . len ( ) ) ;
2011
+ let new_mon_id = self . pending_monitor_updates . get ( insert_pos)
2012
+ . map ( |upd| upd. update . update_id ) . unwrap_or ( monitor_update. update_id ) ;
2013
+ monitor_update. update_id = new_mon_id;
2014
+ self . pending_monitor_updates . insert ( insert_pos, PendingChannelMonitorUpdate {
2015
+ update : monitor_update, blocked : false ,
2016
+ } ) ;
2017
+ for held_update in self . pending_monitor_updates . iter_mut ( ) . skip ( insert_pos + 1 ) {
2018
+ held_update. update . update_id += 1 ;
2019
+ }
2020
+ if msg. is_some ( ) {
2021
+ debug_assert ! ( false , "If there is a pending blocked monitor we should have MonitorUpdateInProgress set" ) ;
2022
+ let update = self . build_commitment_no_status_check ( logger) ;
2023
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
2024
+ update, blocked : true ,
2025
+ } ) ;
2026
+ }
2027
+ insert_pos
2028
+ } ;
2029
+ self . monitor_updating_paused ( false , msg. is_some ( ) , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1989
2030
UpdateFulfillCommitFetch :: NewClaim {
1990
- monitor_update : self . pending_monitor_updates . last ( ) . unwrap ( ) ,
2031
+ monitor_update : & self . pending_monitor_updates . get ( unblocked_monitor_pos)
2032
+ . expect ( "We just pushed the monitor update" ) . update ,
1991
2033
htlc_value_msat,
1992
2034
}
1993
2035
} ,
1994
- UpdateFulfillFetch :: NewClaim { monitor_update, htlc_value_msat, msg : None } => {
1995
- self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1996
- self . pending_monitor_updates . push ( monitor_update) ;
1997
- UpdateFulfillCommitFetch :: NewClaim {
1998
- monitor_update : self . pending_monitor_updates . last ( ) . unwrap ( ) ,
1999
- htlc_value_msat,
2000
- }
2001
- }
2002
2036
UpdateFulfillFetch :: DuplicateClaim { } => UpdateFulfillCommitFetch :: DuplicateClaim { } ,
2003
2037
}
2004
2038
}
@@ -3066,7 +3100,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3066
3100
Ok ( ( ) )
3067
3101
}
3068
3102
3069
- pub fn commitment_signed < L : Deref > ( & mut self , msg : & msgs:: CommitmentSigned , logger : & L ) -> Result < & ChannelMonitorUpdate , ChannelError >
3103
+ pub fn commitment_signed < L : Deref > ( & mut self , msg : & msgs:: CommitmentSigned , logger : & L ) -> Result < Option < & ChannelMonitorUpdate > , ChannelError >
3070
3104
where L :: Target : Logger
3071
3105
{
3072
3106
if ( self . channel_state & ( ChannelState :: ChannelReady as u32 ) ) != ( ChannelState :: ChannelReady as u32 ) {
@@ -3266,8 +3300,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3266
3300
}
3267
3301
log_debug ! ( logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply." ,
3268
3302
log_bytes!( self . channel_id) ) ;
3269
- self . pending_monitor_updates . push ( monitor_update) ;
3270
- return Ok ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) ;
3303
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
3304
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3305
+ update : monitor_update, blocked : !release_monitor
3306
+ } ) ;
3307
+ return Ok ( if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) ;
3271
3308
}
3272
3309
3273
3310
let need_commitment_signed = if need_commitment && ( self . channel_state & ( ChannelState :: AwaitingRemoteRevoke as u32 ) ) == 0 {
@@ -3284,9 +3321,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3284
3321
3285
3322
log_debug ! ( logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack." ,
3286
3323
log_bytes!( self . channel_id( ) ) , if need_commitment_signed { " our own commitment_signed and" } else { "" } ) ;
3287
- self . pending_monitor_updates . push ( monitor_update) ;
3324
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
3325
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3326
+ update : monitor_update, blocked : !release_monitor,
3327
+ } ) ;
3288
3328
self . monitor_updating_paused ( true , need_commitment_signed, false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
3289
- return Ok ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) ;
3329
+ return Ok ( if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd . update ) } else { None } ) ;
3290
3330
}
3291
3331
3292
3332
/// Public version of the below, checking relevant preconditions first.
@@ -3401,8 +3441,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3401
3441
update_add_htlcs. len( ) , update_fulfill_htlcs. len( ) , update_fail_htlcs. len( ) ) ;
3402
3442
3403
3443
self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
3404
- self . pending_monitor_updates . push ( monitor_update) ;
3405
- ( Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) , htlcs_to_fail)
3444
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
3445
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3446
+ update : monitor_update, blocked : !release_monitor,
3447
+ } ) ;
3448
+ ( if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ,
3449
+ htlcs_to_fail)
3406
3450
} else {
3407
3451
( None , Vec :: new ( ) )
3408
3452
}
@@ -3413,7 +3457,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3413
3457
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
3414
3458
/// generating an appropriate error *after* the channel state has been updated based on the
3415
3459
/// revoke_and_ack message.
3416
- pub fn revoke_and_ack < L : Deref > ( & mut self , msg : & msgs:: RevokeAndACK , logger : & L ) -> Result < ( Vec < ( HTLCSource , PaymentHash ) > , & ChannelMonitorUpdate ) , ChannelError >
3460
+ pub fn revoke_and_ack < L : Deref > ( & mut self , msg : & msgs:: RevokeAndACK , logger : & L ) -> Result < ( Vec < ( HTLCSource , PaymentHash ) > , Option < & ChannelMonitorUpdate > ) , ChannelError >
3417
3461
where L :: Target : Logger ,
3418
3462
{
3419
3463
if ( self . channel_state & ( ChannelState :: ChannelReady as u32 ) ) != ( ChannelState :: ChannelReady as u32 ) {
@@ -3610,21 +3654,29 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3610
3654
self . monitor_pending_failures . append ( & mut revoked_htlcs) ;
3611
3655
self . monitor_pending_finalized_fulfills . append ( & mut finalized_claimed_htlcs) ;
3612
3656
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply." , log_bytes!( self . channel_id( ) ) ) ;
3613
- self . pending_monitor_updates . push ( monitor_update) ;
3614
- return Ok ( ( Vec :: new ( ) , self . pending_monitor_updates . last ( ) . unwrap ( ) ) ) ;
3657
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
3658
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3659
+ update : monitor_update, blocked : !release_monitor,
3660
+ } ) ;
3661
+ return Ok ( ( Vec :: new ( ) ,
3662
+ if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) ) ;
3615
3663
}
3616
3664
3617
3665
match self . free_holding_cell_htlcs ( logger) {
3618
3666
( Some ( _) , htlcs_to_fail) => {
3619
- let mut additional_update = self . pending_monitor_updates . pop ( ) . unwrap ( ) ;
3667
+ let mut additional_update = self . pending_monitor_updates . pop ( ) . unwrap ( ) . update ;
3620
3668
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
3621
3669
// strictly increasing by one, so decrement it here.
3622
3670
self . latest_monitor_update_id = monitor_update. update_id ;
3623
3671
monitor_update. updates . append ( & mut additional_update. updates ) ;
3624
3672
3625
3673
self . monitor_updating_paused ( false , true , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3626
- self . pending_monitor_updates . push ( monitor_update) ;
3627
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3674
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
3675
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3676
+ update : monitor_update, blocked : !release_monitor,
3677
+ } ) ;
3678
+ Ok ( ( htlcs_to_fail,
3679
+ if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3628
3680
} ,
3629
3681
( None , htlcs_to_fail) => {
3630
3682
if require_commitment {
@@ -3638,13 +3690,21 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3638
3690
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed." ,
3639
3691
log_bytes!( self . channel_id( ) ) , update_fail_htlcs. len( ) + update_fail_malformed_htlcs. len( ) ) ;
3640
3692
self . monitor_updating_paused ( false , true , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3641
- self . pending_monitor_updates . push ( monitor_update) ;
3642
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3693
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
3694
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3695
+ update : monitor_update, blocked : !release_monitor,
3696
+ } ) ;
3697
+ Ok ( ( htlcs_to_fail,
3698
+ if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3643
3699
} else {
3644
3700
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {} with no reply necessary." , log_bytes!( self . channel_id( ) ) ) ;
3645
3701
self . monitor_updating_paused ( false , false , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3646
- self . pending_monitor_updates . push ( monitor_update) ;
3647
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3702
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
3703
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3704
+ update : monitor_update, blocked : !release_monitor,
3705
+ } ) ;
3706
+ Ok ( ( htlcs_to_fail,
3707
+ if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3648
3708
}
3649
3709
}
3650
3710
}
@@ -3833,7 +3893,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3833
3893
{
3834
3894
assert_eq ! ( self . channel_state & ChannelState :: MonitorUpdateInProgress as u32 , ChannelState :: MonitorUpdateInProgress as u32 ) ;
3835
3895
self . channel_state &= !( ChannelState :: MonitorUpdateInProgress as u32 ) ;
3836
- self . pending_monitor_updates . clear ( ) ;
3896
+ let mut found_blocked = false ;
3897
+ self . pending_monitor_updates . retain ( |upd| {
3898
+ if found_blocked { debug_assert ! ( upd. blocked, "No mons may be unblocked after a blocked one" ) ; }
3899
+ if upd. blocked { found_blocked = true ; }
3900
+ upd. blocked
3901
+ } ) ;
3837
3902
3838
3903
// If we're past (or at) the FundingSent stage on an outbound channel, try to
3839
3904
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4376,8 +4441,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
4376
4441
} ] ,
4377
4442
} ;
4378
4443
self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
4379
- self . pending_monitor_updates . push ( monitor_update) ;
4380
- Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) )
4444
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
4445
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
4446
+ update : monitor_update, blocked : !release_monitor,
4447
+ } ) ;
4448
+ if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None }
4381
4449
} else { None } ;
4382
4450
let shutdown = if send_shutdown {
4383
4451
Some ( msgs:: Shutdown {
@@ -4949,8 +5017,25 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
4949
5017
( self . channel_state & ChannelState :: MonitorUpdateInProgress as u32 ) != 0
4950
5018
}
4951
5019
4952
- pub fn get_next_monitor_update ( & self ) -> Option < & ChannelMonitorUpdate > {
4953
- self . pending_monitor_updates . first ( )
5020
+ /// Returns the next blocked monitor update, if one exists, and a bool which indicates a
5021
+ /// further blocked monitor update exists after the next.
5022
+ pub fn unblock_next_blocked_monitor_update ( & mut self ) -> Option < ( & ChannelMonitorUpdate , bool ) > {
5023
+ for i in 0 ..self . pending_monitor_updates . len ( ) {
5024
+ if self . pending_monitor_updates [ i] . blocked {
5025
+ self . pending_monitor_updates [ i] . blocked = false ;
5026
+ return Some ( ( & self . pending_monitor_updates [ i] . update ,
5027
+ self . pending_monitor_updates . len ( ) > i + 1 ) ) ;
5028
+ }
5029
+ }
5030
+ None
5031
+ }
5032
+
5033
+ pub fn no_monitor_updates_pending ( & self ) -> bool {
5034
+ self . pending_monitor_updates . is_empty ( )
5035
+ }
5036
+
5037
+ pub fn complete_one_mon_update ( & mut self , update_id : u64 ) {
5038
+ self . pending_monitor_updates . retain ( |upd| upd. update . update_id != update_id) ;
4954
5039
}
4955
5040
4956
5041
/// Returns true if funding_created was sent/received.
@@ -5998,8 +6083,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
5998
6083
Some ( _) => {
5999
6084
let monitor_update = self . build_commitment_no_status_check ( logger) ;
6000
6085
self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
6001
- self . pending_monitor_updates . push ( monitor_update) ;
6002
- Ok ( Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
6086
+
6087
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
6088
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
6089
+ update : monitor_update, blocked : !release_monitor,
6090
+ } ) ;
6091
+ Ok ( if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } )
6003
6092
} ,
6004
6093
None => Ok ( None )
6005
6094
}
@@ -6088,8 +6177,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
6088
6177
} ] ,
6089
6178
} ;
6090
6179
self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
6091
- self . pending_monitor_updates . push ( monitor_update) ;
6092
- Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) )
6180
+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
6181
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
6182
+ update : monitor_update, blocked : !release_monitor,
6183
+ } ) ;
6184
+ if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None }
6093
6185
} else { None } ;
6094
6186
let shutdown = msgs:: Shutdown {
6095
6187
channel_id : self . channel_id ,
0 commit comments