@@ -479,6 +479,17 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
479
479
/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
480
480
pub ( crate ) const EXPIRE_PREV_CONFIG_TICKS : usize = 5 ;
481
481
482
+ struct PendingChannelMonitorUpdate {
483
+ update : ChannelMonitorUpdate ,
484
+ /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] fly until after an
485
+ /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] has
486
+ /// flown and we're waiting to hear back, otherwise the update is waiting on some external
487
+ /// event and the [`ChannelManager`] will update us when we're ready.
488
+ ///
489
+ /// [`ChannelManager`]: super::channelmanager::ChannelManager
490
+ flown : bool ,
491
+ }
492
+
482
493
// TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking
483
494
// has been completed, and then turn into a Channel to get compiler-time enforcement of things like
484
495
// calling channel_id() before we're set up or things like get_outbound_funding_signed on an
@@ -744,7 +755,7 @@ pub(super) struct Channel<Signer: ChannelSigner> {
744
755
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
745
756
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
746
757
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
747
- pending_monitor_updates : Vec < ChannelMonitorUpdate > ,
758
+ pending_monitor_updates : Vec < PendingChannelMonitorUpdate > ,
748
759
}
749
760
750
761
#[ cfg( any( test, fuzzing) ) ]
@@ -1977,28 +1988,52 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
1977
1988
}
1978
1989
1979
1990
pub fn get_update_fulfill_htlc_and_commit < L : Deref > ( & mut self , htlc_id : u64 , payment_preimage : PaymentPreimage , logger : & L ) -> UpdateFulfillCommitFetch where L :: Target : Logger {
1991
+ let fly_cs_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
1980
1992
match self . get_update_fulfill_htlc ( htlc_id, payment_preimage, logger) {
1981
- UpdateFulfillFetch :: NewClaim { mut monitor_update, htlc_value_msat, msg : Some ( _) } => {
1982
- let mut additional_update = self . build_commitment_no_status_check ( logger) ;
1983
- // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
1984
- // strictly increasing by one, so decrement it here.
1985
- self . latest_monitor_update_id = monitor_update. update_id ;
1986
- monitor_update. updates . append ( & mut additional_update. updates ) ;
1987
- self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1988
- self . pending_monitor_updates . push ( monitor_update) ;
1993
+ UpdateFulfillFetch :: NewClaim { mut monitor_update, htlc_value_msat, msg } => {
1994
+ // Even if we aren't supposed to let new monitor updates with commitment state
1995
+ // updates fly, we still need to push the preimage ChannelMonitorUpdateStep no
1996
+ // matter what. Sadly, to push a new monitor update which flies before others
1997
+ // already queued, we have to insert it into the pending queue and update the
1998
+ // update_ids of all the following monitors.
1999
+ let flown_monitor_pos = if fly_cs_monitor && msg. is_some ( ) {
2000
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
2001
+ // strictly increasing by one, so decrement it here.
2002
+ let mut additional_update = self . build_commitment_no_status_check ( logger) ;
2003
+ self . latest_monitor_update_id = monitor_update. update_id ;
2004
+ monitor_update. updates . append ( & mut additional_update. updates ) ;
2005
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
2006
+ update : monitor_update, flown : true ,
2007
+ } ) ;
2008
+ self . pending_monitor_updates . len ( ) - 1
2009
+ } else {
2010
+ let insert_pos = self . pending_monitor_updates . iter ( ) . position ( |upd| !upd. flown )
2011
+ . unwrap_or ( self . pending_monitor_updates . len ( ) ) ;
2012
+ let new_mon_id = self . pending_monitor_updates . get ( insert_pos)
2013
+ . map ( |upd| upd. update . update_id ) . unwrap_or ( monitor_update. update_id ) ;
2014
+ monitor_update. update_id = new_mon_id;
2015
+ self . pending_monitor_updates . insert ( insert_pos, PendingChannelMonitorUpdate {
2016
+ update : monitor_update, flown : true ,
2017
+ } ) ;
2018
+ for held_update in self . pending_monitor_updates . iter_mut ( ) . skip ( insert_pos + 1 ) {
2019
+ held_update. update . update_id += 1 ;
2020
+ }
2021
+ if msg. is_some ( ) {
2022
+ debug_assert ! ( false , "If there is a pending unflown monitor we should have AwaitingMonitorUpdate set" ) ;
2023
+ let update = self . build_commitment_no_status_check ( logger) ;
2024
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
2025
+ update, flown : false ,
2026
+ } ) ;
2027
+ }
2028
+ insert_pos
2029
+ } ;
2030
+ self . monitor_updating_paused ( false , msg. is_some ( ) , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1989
2031
UpdateFulfillCommitFetch :: NewClaim {
1990
- monitor_update : self . pending_monitor_updates . last ( ) . unwrap ( ) ,
2032
+ monitor_update : & self . pending_monitor_updates . get ( flown_monitor_pos)
2033
+ . expect ( "We just pushed the monitor update" ) . update ,
1991
2034
htlc_value_msat,
1992
2035
}
1993
2036
} ,
1994
- UpdateFulfillFetch :: NewClaim { monitor_update, htlc_value_msat, msg : None } => {
1995
- self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1996
- self . pending_monitor_updates . push ( monitor_update) ;
1997
- UpdateFulfillCommitFetch :: NewClaim {
1998
- monitor_update : self . pending_monitor_updates . last ( ) . unwrap ( ) ,
1999
- htlc_value_msat,
2000
- }
2001
- }
2002
2037
UpdateFulfillFetch :: DuplicateClaim { } => UpdateFulfillCommitFetch :: DuplicateClaim { } ,
2003
2038
}
2004
2039
}
@@ -3066,7 +3101,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3066
3101
Ok ( ( ) )
3067
3102
}
3068
3103
3069
- pub fn commitment_signed < L : Deref > ( & mut self , msg : & msgs:: CommitmentSigned , logger : & L ) -> Result < & ChannelMonitorUpdate , ChannelError >
3104
+ pub fn commitment_signed < L : Deref > ( & mut self , msg : & msgs:: CommitmentSigned , logger : & L ) -> Result < Option < & ChannelMonitorUpdate > , ChannelError >
3070
3105
where L :: Target : Logger
3071
3106
{
3072
3107
if ( self . channel_state & ( ChannelState :: ChannelReady as u32 ) ) != ( ChannelState :: ChannelReady as u32 ) {
@@ -3242,8 +3277,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3242
3277
}
3243
3278
log_debug ! ( logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply." ,
3244
3279
log_bytes!( self . channel_id) ) ;
3245
- self . pending_monitor_updates . push ( monitor_update) ;
3246
- return Ok ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) ;
3280
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3281
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3282
+ update : monitor_update, flown : fly_monitor
3283
+ } ) ;
3284
+ return Ok ( if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) ;
3247
3285
}
3248
3286
3249
3287
let need_commitment_signed = if need_commitment && ( self . channel_state & ( ChannelState :: AwaitingRemoteRevoke as u32 ) ) == 0 {
@@ -3260,9 +3298,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3260
3298
3261
3299
log_debug ! ( logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack." ,
3262
3300
log_bytes!( self . channel_id( ) ) , if need_commitment_signed { " our own commitment_signed and" } else { "" } ) ;
3263
- self . pending_monitor_updates . push ( monitor_update) ;
3301
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3302
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3303
+ update : monitor_update, flown : fly_monitor,
3304
+ } ) ;
3264
3305
self . monitor_updating_paused ( true , need_commitment_signed, false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
3265
- return Ok ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) ;
3306
+ return Ok ( if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd . update ) } else { None } ) ;
3266
3307
}
3267
3308
3268
3309
/// Public version of the below, checking relevant preconditions first.
@@ -3377,8 +3418,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3377
3418
update_add_htlcs. len( ) , update_fulfill_htlcs. len( ) , update_fail_htlcs. len( ) ) ;
3378
3419
3379
3420
self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
3380
- self . pending_monitor_updates . push ( monitor_update) ;
3381
- ( Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) , htlcs_to_fail)
3421
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3422
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3423
+ update : monitor_update, flown : fly_monitor,
3424
+ } ) ;
3425
+ ( if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ,
3426
+ htlcs_to_fail)
3382
3427
} else {
3383
3428
( None , Vec :: new ( ) )
3384
3429
}
@@ -3389,7 +3434,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3389
3434
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
3390
3435
/// generating an appropriate error *after* the channel state has been updated based on the
3391
3436
/// revoke_and_ack message.
3392
- pub fn revoke_and_ack < L : Deref > ( & mut self , msg : & msgs:: RevokeAndACK , logger : & L ) -> Result < ( Vec < ( HTLCSource , PaymentHash ) > , & ChannelMonitorUpdate ) , ChannelError >
3437
+ pub fn revoke_and_ack < L : Deref > ( & mut self , msg : & msgs:: RevokeAndACK , logger : & L ) -> Result < ( Vec < ( HTLCSource , PaymentHash ) > , Option < & ChannelMonitorUpdate > ) , ChannelError >
3393
3438
where L :: Target : Logger ,
3394
3439
{
3395
3440
if ( self . channel_state & ( ChannelState :: ChannelReady as u32 ) ) != ( ChannelState :: ChannelReady as u32 ) {
@@ -3586,21 +3631,29 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3586
3631
self . monitor_pending_failures . append ( & mut revoked_htlcs) ;
3587
3632
self . monitor_pending_finalized_fulfills . append ( & mut finalized_claimed_htlcs) ;
3588
3633
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply." , log_bytes!( self . channel_id( ) ) ) ;
3589
- self . pending_monitor_updates . push ( monitor_update) ;
3590
- return Ok ( ( Vec :: new ( ) , self . pending_monitor_updates . last ( ) . unwrap ( ) ) ) ;
3634
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3635
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3636
+ update : monitor_update, flown : fly_monitor,
3637
+ } ) ;
3638
+ return Ok ( ( Vec :: new ( ) ,
3639
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) ) ;
3591
3640
}
3592
3641
3593
3642
match self . free_holding_cell_htlcs ( logger) {
3594
3643
( Some ( _) , htlcs_to_fail) => {
3595
- let mut additional_update = self . pending_monitor_updates . pop ( ) . unwrap ( ) ;
3644
+ let mut additional_update = self . pending_monitor_updates . pop ( ) . unwrap ( ) . update ;
3596
3645
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
3597
3646
// strictly increasing by one, so decrement it here.
3598
3647
self . latest_monitor_update_id = monitor_update. update_id ;
3599
3648
monitor_update. updates . append ( & mut additional_update. updates ) ;
3600
3649
3601
3650
self . monitor_updating_paused ( false , true , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3602
- self . pending_monitor_updates . push ( monitor_update) ;
3603
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3651
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3652
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3653
+ update : monitor_update, flown : fly_monitor,
3654
+ } ) ;
3655
+ Ok ( ( htlcs_to_fail,
3656
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3604
3657
} ,
3605
3658
( None , htlcs_to_fail) => {
3606
3659
if require_commitment {
@@ -3614,13 +3667,21 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3614
3667
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed." ,
3615
3668
log_bytes!( self . channel_id( ) ) , update_fail_htlcs. len( ) + update_fail_malformed_htlcs. len( ) ) ;
3616
3669
self . monitor_updating_paused ( false , true , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3617
- self . pending_monitor_updates . push ( monitor_update) ;
3618
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3670
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3671
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3672
+ update : monitor_update, flown : fly_monitor,
3673
+ } ) ;
3674
+ Ok ( ( htlcs_to_fail,
3675
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3619
3676
} else {
3620
3677
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {} with no reply necessary." , log_bytes!( self . channel_id( ) ) ) ;
3621
3678
self . monitor_updating_paused ( false , false , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3622
- self . pending_monitor_updates . push ( monitor_update) ;
3623
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3679
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3680
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3681
+ update : monitor_update, flown : fly_monitor,
3682
+ } ) ;
3683
+ Ok ( ( htlcs_to_fail,
3684
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3624
3685
}
3625
3686
}
3626
3687
}
@@ -3809,7 +3870,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3809
3870
{
3810
3871
assert_eq ! ( self . channel_state & ChannelState :: MonitorUpdateInProgress as u32 , ChannelState :: MonitorUpdateInProgress as u32 ) ;
3811
3872
self . channel_state &= !( ChannelState :: MonitorUpdateInProgress as u32 ) ;
3812
- self . pending_monitor_updates . clear ( ) ;
3873
+ let mut found_unflown = false ;
3874
+ self . pending_monitor_updates . retain ( |upd| {
3875
+ if found_unflown { debug_assert ! ( !upd. flown, "No mons may fly after one is paused" ) ; }
3876
+ if !upd. flown { found_unflown = true ; }
3877
+ !upd. flown
3878
+ } ) ;
3813
3879
3814
3880
// If we're past (or at) the FundingSent stage on an outbound channel, try to
3815
3881
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4352,8 +4418,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
4352
4418
} ] ,
4353
4419
} ;
4354
4420
self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
4355
- self . pending_monitor_updates . push ( monitor_update) ;
4356
- Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) )
4421
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
4422
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
4423
+ update : monitor_update, flown : fly_monitor,
4424
+ } ) ;
4425
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None }
4357
4426
} else { None } ;
4358
4427
let shutdown = if send_shutdown {
4359
4428
Some ( msgs:: Shutdown {
@@ -4925,8 +4994,25 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
4925
4994
( self . channel_state & ChannelState :: MonitorUpdateInProgress as u32 ) != 0
4926
4995
}
4927
4996
4928
- pub fn get_next_monitor_update ( & self ) -> Option < & ChannelMonitorUpdate > {
4929
- self . pending_monitor_updates . first ( )
4997
+ /// Returns the next unflown monitor update, if one exists, and a bool which indicates a
4998
+ /// further unflown monitor update exists after the next.
4999
+ pub fn fly_next_unflown_monitor_update ( & mut self ) -> Option < ( & ChannelMonitorUpdate , bool ) > {
5000
+ for i in 0 ..self . pending_monitor_updates . len ( ) {
5001
+ if !self . pending_monitor_updates [ i] . flown {
5002
+ self . pending_monitor_updates [ i] . flown = true ;
5003
+ return Some ( ( & self . pending_monitor_updates [ i] . update ,
5004
+ self . pending_monitor_updates . len ( ) > i + 1 ) ) ;
5005
+ }
5006
+ }
5007
+ None
5008
+ }
5009
+
5010
+ pub fn no_monitor_updates_pending ( & self ) -> bool {
5011
+ self . pending_monitor_updates . is_empty ( )
5012
+ }
5013
+
5014
+ pub fn complete_one_mon_update ( & mut self , update_id : u64 ) {
5015
+ self . pending_monitor_updates . retain ( |upd| upd. update . update_id != update_id) ;
4930
5016
}
4931
5017
4932
5018
/// Returns true if funding_created was sent/received.
@@ -5974,8 +6060,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
5974
6060
Some ( _) => {
5975
6061
let monitor_update = self . build_commitment_no_status_check ( logger) ;
5976
6062
self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
5977
- self . pending_monitor_updates . push ( monitor_update) ;
5978
- Ok ( Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
6063
+
6064
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
6065
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
6066
+ update : monitor_update, flown : fly_monitor,
6067
+ } ) ;
6068
+ Ok ( if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } )
5979
6069
} ,
5980
6070
None => Ok ( None )
5981
6071
}
@@ -6064,8 +6154,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
6064
6154
} ] ,
6065
6155
} ;
6066
6156
self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
6067
- self . pending_monitor_updates . push ( monitor_update) ;
6068
- Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) )
6157
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
6158
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
6159
+ update : monitor_update, flown : fly_monitor,
6160
+ } ) ;
6161
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None }
6069
6162
} else { None } ;
6070
6163
let shutdown = msgs:: Shutdown {
6071
6164
channel_id : self . channel_id ,
0 commit comments