@@ -44,6 +44,7 @@ use crate::prelude::*;
44
44
use crate :: sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
45
45
use core:: ops:: Deref ;
46
46
use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
47
+ use bitcoin:: hashes:: Hash ;
47
48
use bitcoin:: secp256k1:: PublicKey ;
48
49
49
50
/// `Persist` defines behavior for persisting channel monitors: this could mean
@@ -263,7 +264,7 @@ where C::Target: chain::Filter,
263
264
for funding_outpoint in funding_outpoints. iter ( ) {
264
265
let monitor_lock = self . monitors . read ( ) . unwrap ( ) ;
265
266
if let Some ( monitor_state) = monitor_lock. get ( funding_outpoint) {
266
- if self . update_monitor_with_chain_data ( header, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
267
+ if self . update_monitor_with_chain_data ( header, best_height , txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
267
268
// Take the monitors lock for writing so that we poison it and any future
268
269
// operations going forward fail immediately.
269
270
core:: mem:: drop ( monitor_lock) ;
@@ -278,7 +279,7 @@ where C::Target: chain::Filter,
278
279
let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
279
280
for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
280
281
if !funding_outpoints. contains ( funding_outpoint) {
281
- if self . update_monitor_with_chain_data ( header, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
282
+ if self . update_monitor_with_chain_data ( header, best_height , txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
282
283
log_error ! ( self . logger, "{}" , err_str) ;
283
284
panic ! ( "{}" , err_str) ;
284
285
}
@@ -297,14 +298,23 @@ where C::Target: chain::Filter,
297
298
}
298
299
299
300
fn update_monitor_with_chain_data < FN > (
300
- & self , header : & Header , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint ,
301
- monitor_state : & MonitorHolder < ChannelSigner >
301
+ & self , header : & Header , best_height : Option < u32 > , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint ,
302
+ monitor_state : & MonitorHolder < ChannelSigner > ,
302
303
) -> Result < ( ) , ( ) > where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
303
304
let monitor = & monitor_state. monitor ;
304
305
let logger = WithChannelMonitor :: from ( & self . logger , & monitor, None ) ;
305
- let mut txn_outputs;
306
- {
307
- txn_outputs = process ( monitor, txdata) ;
306
+
307
+ let mut txn_outputs = process ( monitor, txdata) ;
308
+
309
+ let get_partition_key = |funding_outpoint : & OutPoint | {
310
+ let funding_txid_hash = funding_outpoint. txid . to_raw_hash ( ) ;
311
+ let funding_txid_hash_bytes = funding_txid_hash. as_byte_array ( ) ;
312
+ let funding_txid_u32 = u32:: from_be_bytes ( [ funding_txid_hash_bytes[ 0 ] , funding_txid_hash_bytes[ 1 ] , funding_txid_hash_bytes[ 2 ] , funding_txid_hash_bytes[ 3 ] ] ) ;
313
+ funding_txid_u32. wrapping_add ( best_height. unwrap_or_default ( ) )
314
+ } ;
315
+ const CHAINSYNC_MONITOR_PARTITION_FACTOR : u32 = 50 ;
316
+ let has_pending_claims = monitor_state. monitor . has_pending_claims ( ) ;
317
+ if has_pending_claims || get_partition_key ( funding_outpoint) % CHAINSYNC_MONITOR_PARTITION_FACTOR == 0 {
308
318
log_trace ! ( logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
309
319
match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor) {
310
320
ChannelMonitorUpdateStatus :: Completed =>
@@ -313,10 +323,10 @@ where C::Target: chain::Filter,
313
323
) ,
314
324
ChannelMonitorUpdateStatus :: InProgress => {
315
325
log_trace ! ( logger, "Channel Monitor sync for channel {} in progress." , log_funding_info!( monitor) ) ;
316
- } ,
326
+ }
317
327
ChannelMonitorUpdateStatus :: UnrecoverableError => {
318
328
return Err ( ( ) ) ;
319
- } ,
329
+ }
320
330
}
321
331
}
322
332
@@ -870,14 +880,17 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
870
880
871
881
#[ cfg( test) ]
872
882
mod tests {
873
- use crate :: check_added_monitors;
883
+ use crate :: { check_added_monitors, check_closed_event } ;
874
884
use crate :: { expect_payment_path_successful, get_event_msg} ;
875
885
use crate :: { get_htlc_update_msgs, get_revoke_commit_msgs} ;
876
886
use crate :: chain:: { ChannelMonitorUpdateStatus , Watch } ;
877
- use crate :: events:: { Event , MessageSendEvent , MessageSendEventsProvider } ;
887
+ use crate :: chain:: channelmonitor:: ANTI_REORG_DELAY ;
888
+ use crate :: events:: { ClosureReason , Event , MessageSendEvent , MessageSendEventsProvider } ;
878
889
use crate :: ln:: functional_test_utils:: * ;
879
890
use crate :: ln:: msgs:: ChannelMessageHandler ;
880
891
892
+ const CHAINSYNC_MONITOR_PARTITION_FACTOR : u32 = 50 ;
893
+
881
894
#[ test]
882
895
fn test_async_ooo_offchain_updates ( ) {
883
896
// Test that if we have multiple offchain updates being persisted and they complete
@@ -983,6 +996,79 @@ mod tests {
983
996
check_added_monitors ! ( nodes[ 0 ] , 1 ) ;
984
997
}
985
998
999
+ #[ test]
1000
+ fn test_chainsync_triggers_distributed_monitor_persistence ( ) {
1001
+ let chanmon_cfgs = create_chanmon_cfgs ( 3 ) ;
1002
+ let node_cfgs = create_node_cfgs ( 3 , & chanmon_cfgs) ;
1003
+ let node_chanmgrs = create_node_chanmgrs ( 3 , & node_cfgs, & [ None , None , None ] ) ;
1004
+ let nodes = create_network ( 3 , & node_cfgs, & node_chanmgrs) ;
1005
+
1006
+ // Use FullBlockViaListen to avoid duplicate calls to process_chain_data and skips_blocks() in
1007
+ // case of other connect_styles.
1008
+ * nodes[ 0 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1009
+ * nodes[ 1 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1010
+ * nodes[ 2 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1011
+
1012
+ let _channel_1 = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) . 2 ;
1013
+ let channel_2 = create_announced_chan_between_nodes_with_value ( & nodes, 0 , 2 , 1_000_000 , 0 ) . 2 ;
1014
+
1015
+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1016
+ chanmon_cfgs[ 1 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1017
+ chanmon_cfgs[ 2 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1018
+
1019
+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1020
+ connect_blocks ( & nodes[ 1 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1021
+ connect_blocks ( & nodes[ 2 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1022
+
1023
+ // Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] * 2 blocks should trigger only 2 writes
1024
+ // per monitor/channel.
1025
+ assert_eq ! ( 2 * 2 , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1026
+ assert_eq ! ( 2 , chanmon_cfgs[ 1 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1027
+ assert_eq ! ( 2 , chanmon_cfgs[ 2 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1028
+
1029
+ // Test that monitors with pending_claims are persisted on every block.
1030
+ // Now, close channel_2 i.e. b/w node-0 and node-2 to create pending_claim in node[0].
1031
+ nodes[ 0 ] . node . force_close_broadcasting_latest_txn ( & channel_2, & nodes[ 2 ] . node . get_our_node_id ( ) , "Channel force-closed" . to_string ( ) ) . unwrap ( ) ;
1032
+ check_closed_event ! ( & nodes[ 0 ] , 1 , ClosureReason :: HolderForceClosed , false ,
1033
+ [ nodes[ 2 ] . node. get_our_node_id( ) ] , 1000000 ) ;
1034
+ check_closed_broadcast ( & nodes[ 0 ] , 1 , true ) ;
1035
+ let close_tx = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
1036
+ assert_eq ! ( close_tx. len( ) , 1 ) ;
1037
+
1038
+ mine_transaction ( & nodes[ 2 ] , & close_tx[ 0 ] ) ;
1039
+ check_added_monitors ( & nodes[ 2 ] , 1 ) ;
1040
+ check_closed_broadcast ( & nodes[ 2 ] , 1 , true ) ;
1041
+ check_closed_event ! ( & nodes[ 2 ] , 1 , ClosureReason :: CommitmentTxConfirmed , false ,
1042
+ [ nodes[ 0 ] . node. get_our_node_id( ) ] , 1000000 ) ;
1043
+
1044
+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1045
+ chanmon_cfgs[ 2 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1046
+
1047
+ // For channel_2, there should be a monitor write for every block connection.
1048
+ // We connect [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`] blocks since we don't know when
1049
+ // channel_1 monitor persistence will occur, with [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`]
1050
+ // it will be persisted exactly once.
1051
+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1052
+ connect_blocks ( & nodes[ 2 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1053
+
1054
+ // DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR writes for channel_2 due to pending_claim, 1 for
1055
+ // channel_1
1056
+ assert_eq ! ( ( CHAINSYNC_MONITOR_PARTITION_FACTOR + 1 ) as usize , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1057
+ // For node[2], there is no pending_claim
1058
+ assert_eq ! ( 1 , chanmon_cfgs[ 2 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1059
+
1060
+ // Confirm claim for node[0] with ANTI_REORG_DELAY and reset monitor write counter.
1061
+ mine_transaction ( & nodes[ 0 ] , & close_tx[ 0 ] ) ;
1062
+ connect_blocks ( & nodes[ 0 ] , ANTI_REORG_DELAY - 1 ) ;
1063
+ check_added_monitors ( & nodes[ 0 ] , 1 ) ;
1064
+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1065
+
1066
+ // Again connect 1 full cycle of DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR blocks, it should only
1067
+ // result in 1 write per monitor/channel.
1068
+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1069
+ assert_eq ! ( 2 , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1070
+ }
1071
+
986
1072
#[ test]
987
1073
#[ cfg( feature = "std" ) ]
988
1074
fn update_during_chainsync_poisons_channel ( ) {
@@ -991,12 +1077,15 @@ mod tests {
991
1077
let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
992
1078
let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
993
1079
create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
1080
+ * nodes[ 0 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
994
1081
995
1082
chanmon_cfgs[ 0 ] . persister . set_update_ret ( ChannelMonitorUpdateStatus :: UnrecoverableError ) ;
996
1083
997
1084
assert ! ( std:: panic:: catch_unwind( || {
998
1085
// Returning an UnrecoverableError should always panic immediately
999
- connect_blocks( & nodes[ 0 ] , 1 ) ;
1086
+ // Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] blocks so that we trigger some persistence
1087
+ // after accounting for block-height based partitioning/distribution.
1088
+ connect_blocks( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1000
1089
} ) . is_err( ) ) ;
1001
1090
assert ! ( std:: panic:: catch_unwind( || {
1002
1091
// ...and also poison our locks causing later use to panic as well
0 commit comments