@@ -27,30 +27,32 @@ use bitcoin::network::constants::Network;
27
27
use bitcoin_hashes:: Hash as TraitImport ;
28
28
use bitcoin_hashes:: hash160:: Hash as Hash160 ;
29
29
use bitcoin_hashes:: sha256:: Hash as Sha256 ;
30
+ use bitcoin_hashes:: sha256d:: Hash as Sha256d ;
30
31
31
32
use lightning:: chain:: chaininterface;
32
33
use lightning:: chain:: transaction:: OutPoint ;
33
34
use lightning:: chain:: chaininterface:: { BroadcasterInterface , ConfirmationTarget , ChainListener , FeeEstimator , ChainWatchInterfaceUtil } ;
34
35
use lightning:: chain:: keysinterface:: { ChannelKeys , KeysInterface } ;
35
36
use lightning:: ln:: channelmonitor;
36
- use lightning:: ln:: channelmonitor:: { ChannelMonitorUpdateErr , HTLCUpdate } ;
37
- use lightning:: ln:: channelmanager:: { ChannelManager , PaymentHash , PaymentPreimage } ;
37
+ use lightning:: ln:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdateErr , HTLCUpdate } ;
38
+ use lightning:: ln:: channelmanager:: { ChannelManager , PaymentHash , PaymentPreimage , ChannelManagerReadArgs } ;
38
39
use lightning:: ln:: router:: { Route , RouteHop } ;
39
40
use lightning:: ln:: msgs:: { CommitmentUpdate , ChannelMessageHandler , ErrorAction , HandleError , UpdateAddHTLC , LocalFeatures } ;
40
41
use lightning:: util:: events;
41
42
use lightning:: util:: logger:: Logger ;
42
43
use lightning:: util:: config:: UserConfig ;
43
44
use lightning:: util:: events:: { EventsProvider , MessageSendEventsProvider } ;
44
- use lightning:: util:: ser:: { Readable , Writeable } ;
45
+ use lightning:: util:: ser:: { Readable , ReadableArgs , Writeable , Writer } ;
45
46
46
47
mod utils;
47
48
use utils:: test_logger;
48
49
49
50
use secp256k1:: key:: { PublicKey , SecretKey } ;
50
51
use secp256k1:: Secp256k1 ;
51
52
53
+ use std:: mem;
52
54
use std:: cmp:: Ordering ;
53
- use std:: collections:: HashSet ;
55
+ use std:: collections:: { HashSet , HashMap } ;
54
56
use std:: sync:: { Arc , Mutex } ;
55
57
use std:: sync:: atomic;
56
58
use std:: io:: Cursor ;
@@ -67,22 +69,51 @@ impl BroadcasterInterface for TestBroadcaster {
67
69
fn broadcast_transaction ( & self , _tx : & Transaction ) { }
68
70
}
69
71
72
+ pub struct VecWriter ( pub Vec < u8 > ) ;
73
+ impl Writer for VecWriter {
74
+ fn write_all ( & mut self , buf : & [ u8 ] ) -> Result < ( ) , :: std:: io:: Error > {
75
+ self . 0 . extend_from_slice ( buf) ;
76
+ Ok ( ( ) )
77
+ }
78
+ fn size_hint ( & mut self , size : usize ) {
79
+ self . 0 . reserve_exact ( size) ;
80
+ }
81
+ }
82
+
70
83
pub struct TestChannelMonitor {
71
84
pub simple_monitor : Arc < channelmonitor:: SimpleManyChannelMonitor < OutPoint > > ,
72
85
pub update_ret : Mutex < Result < ( ) , channelmonitor:: ChannelMonitorUpdateErr > > ,
86
+ pub latest_good_update : Mutex < HashMap < OutPoint , Vec < u8 > > > ,
87
+ pub latest_update_good : Mutex < HashMap < OutPoint , bool > > ,
88
+ pub latest_updates_good_at_last_ser : Mutex < HashMap < OutPoint , bool > > ,
89
+ pub should_update_manager : atomic:: AtomicBool ,
73
90
}
74
91
impl TestChannelMonitor {
75
92
pub fn new ( chain_monitor : Arc < chaininterface:: ChainWatchInterface > , broadcaster : Arc < chaininterface:: BroadcasterInterface > , logger : Arc < Logger > , feeest : Arc < chaininterface:: FeeEstimator > ) -> Self {
76
93
Self {
77
94
simple_monitor : channelmonitor:: SimpleManyChannelMonitor :: new ( chain_monitor, broadcaster, logger, feeest) ,
78
95
update_ret : Mutex :: new ( Ok ( ( ) ) ) ,
96
+ latest_good_update : Mutex :: new ( HashMap :: new ( ) ) ,
97
+ latest_update_good : Mutex :: new ( HashMap :: new ( ) ) ,
98
+ latest_updates_good_at_last_ser : Mutex :: new ( HashMap :: new ( ) ) ,
99
+ should_update_manager : atomic:: AtomicBool :: new ( false ) ,
79
100
}
80
101
}
81
102
}
82
103
impl channelmonitor:: ManyChannelMonitor for TestChannelMonitor {
83
104
fn add_update_monitor ( & self , funding_txo : OutPoint , monitor : channelmonitor:: ChannelMonitor ) -> Result < ( ) , channelmonitor:: ChannelMonitorUpdateErr > {
105
+ let ret = self . update_ret . lock ( ) . unwrap ( ) . clone ( ) ;
106
+ if let Ok ( ( ) ) = ret {
107
+ let mut ser = VecWriter ( Vec :: new ( ) ) ;
108
+ monitor. write_for_disk ( & mut ser) . unwrap ( ) ;
109
+ self . latest_good_update . lock ( ) . unwrap ( ) . insert ( funding_txo, ser. 0 ) ;
110
+ self . latest_update_good . lock ( ) . unwrap ( ) . insert ( funding_txo, true ) ;
111
+ self . should_update_manager . store ( true , atomic:: Ordering :: Relaxed ) ;
112
+ } else {
113
+ self . latest_update_good . lock ( ) . unwrap ( ) . insert ( funding_txo, false ) ;
114
+ }
84
115
assert ! ( self . simple_monitor. add_update_monitor( funding_txo, monitor) . is_ok( ) ) ;
85
- self . update_ret . lock ( ) . unwrap ( ) . clone ( )
116
+ ret
86
117
}
87
118
88
119
fn fetch_pending_htlc_updated ( & self ) -> Vec < HTLCUpdate > {
@@ -155,6 +186,55 @@ pub fn do_test(data: &[u8]) {
155
186
} }
156
187
}
157
188
189
+ macro_rules! reload_node {
190
+ ( $ser: expr, $node_id: expr, $old_monitors: expr) => { {
191
+ let logger: Arc <Logger > = Arc :: new( test_logger:: TestLogger :: new( $node_id. to_string( ) ) ) ;
192
+ let watch = Arc :: new( ChainWatchInterfaceUtil :: new( Network :: Bitcoin , Arc :: clone( & logger) ) ) ;
193
+ let monitor = Arc :: new( TestChannelMonitor :: new( watch. clone( ) , broadcast. clone( ) , logger. clone( ) , fee_est. clone( ) ) ) ;
194
+
195
+ let keys_manager = Arc :: new( KeyProvider { node_id: $node_id, session_id: atomic:: AtomicU8 :: new( 0 ) , channel_id: atomic:: AtomicU8 :: new( 0 ) } ) ;
196
+ let mut config = UserConfig :: new( ) ;
197
+ config. channel_options. fee_proportional_millionths = 0 ;
198
+ config. channel_options. announced_channel = true ;
199
+ config. peer_channel_config_limits. min_dust_limit_satoshis = 0 ;
200
+
201
+ let mut monitors = HashMap :: new( ) ;
202
+ let mut old_monitors = $old_monitors. latest_good_update. lock( ) . unwrap( ) ;
203
+ for ( outpoint, monitor_ser) in old_monitors. drain( ) {
204
+ monitors. insert( outpoint, <( Sha256d , ChannelMonitor ) >:: read( & mut Cursor :: new( & monitor_ser) , Arc :: clone( & logger) ) . expect( "Failed to read monitor" ) . 1 ) ;
205
+ monitor. latest_good_update. lock( ) . unwrap( ) . insert( outpoint, monitor_ser) ;
206
+ }
207
+ let mut monitor_refs = HashMap :: new( ) ;
208
+ for ( outpoint, monitor) in monitors. iter( ) {
209
+ monitor_refs. insert( * outpoint, monitor) ;
210
+ }
211
+
212
+ let read_args = ChannelManagerReadArgs {
213
+ keys_manager,
214
+ fee_estimator: fee_est. clone( ) ,
215
+ monitor: monitor. clone( ) ,
216
+ chain_monitor: watch,
217
+ tx_broadcaster: broadcast. clone( ) ,
218
+ logger,
219
+ default_config: config,
220
+ channel_monitors: & monitor_refs,
221
+ } ;
222
+
223
+ let res = ( <( Sha256d , ChannelManager ) >:: read( & mut Cursor :: new( & $ser. 0 ) , read_args) . expect( "Failed to read manager" ) . 1 , monitor) ;
224
+ for ( _, was_good) in $old_monitors. latest_updates_good_at_last_ser. lock( ) . unwrap( ) . iter( ) {
225
+ if !was_good {
226
+ // If the last time we updated a monitor we didn't successfully update (and we
227
+ // have sense updated our serialized copy of the ChannelManager) we may
228
+ // force-close the channel on our counterparty cause we know we're missing
229
+ // something. Thus, we just return here since we can't continue to test.
230
+ return ;
231
+ }
232
+ }
233
+ res
234
+ } }
235
+ }
236
+
237
+
158
238
let mut channel_txn = Vec :: new ( ) ;
159
239
macro_rules! make_channel {
160
240
( $source: expr, $dest: expr, $chan_id: expr) => { {
@@ -264,11 +344,11 @@ pub fn do_test(data: &[u8]) {
264
344
265
345
// 3 nodes is enough to hit all the possible cases, notably unknown-source-unknown-dest
266
346
// forwarding.
267
- let ( node_a, monitor_a) = make_node ! ( 0 ) ;
268
- let ( node_b, monitor_b) = make_node ! ( 1 ) ;
269
- let ( node_c, monitor_c) = make_node ! ( 2 ) ;
347
+ let ( mut node_a, mut monitor_a) = make_node ! ( 0 ) ;
348
+ let ( mut node_b, mut monitor_b) = make_node ! ( 1 ) ;
349
+ let ( mut node_c, mut monitor_c) = make_node ! ( 2 ) ;
270
350
271
- let nodes = [ node_a, node_b, node_c] ;
351
+ let mut nodes = [ node_a, node_b, node_c] ;
272
352
273
353
make_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 0 ) ;
274
354
make_channel ! ( nodes[ 1 ] , nodes[ 2 ] , 1 ) ;
@@ -286,8 +366,15 @@ pub fn do_test(data: &[u8]) {
286
366
287
367
let mut chan_a_disconnected = false ;
288
368
let mut chan_b_disconnected = false ;
289
- let mut chan_a_reconnecting = false ;
290
- let mut chan_b_reconnecting = false ;
369
+ let mut ba_events = Vec :: new ( ) ;
370
+ let mut bc_events = Vec :: new ( ) ;
371
+
372
+ let mut node_a_ser = VecWriter ( Vec :: new ( ) ) ;
373
+ nodes[ 0 ] . write ( & mut node_a_ser) . unwrap ( ) ;
374
+ let mut node_b_ser = VecWriter ( Vec :: new ( ) ) ;
375
+ nodes[ 1 ] . write ( & mut node_b_ser) . unwrap ( ) ;
376
+ let mut node_c_ser = VecWriter ( Vec :: new ( ) ) ;
377
+ nodes[ 2 ] . write ( & mut node_c_ser) . unwrap ( ) ;
291
378
292
379
macro_rules! test_err {
293
380
( $res: expr) => {
@@ -363,13 +450,18 @@ pub fn do_test(data: &[u8]) {
363
450
364
451
macro_rules! process_msg_events {
365
452
( $node: expr, $corrupt_forward: expr) => { {
366
- for event in nodes[ $node] . get_and_clear_pending_msg_events( ) {
453
+ let events = if $node == 1 {
454
+ let mut new_events = Vec :: new( ) ;
455
+ mem:: swap( & mut new_events, & mut ba_events) ;
456
+ new_events. extend_from_slice( & bc_events[ ..] ) ;
457
+ bc_events. clear( ) ;
458
+ new_events
459
+ } else { Vec :: new( ) } ;
460
+ for event in events. iter( ) . chain( nodes[ $node] . get_and_clear_pending_msg_events( ) . iter( ) ) {
367
461
match event {
368
462
events:: MessageSendEvent :: UpdateHTLCs { ref node_id, updates: CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, ref update_fulfill_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
369
- for ( idx, dest) in nodes. iter( ) . enumerate( ) {
370
- if dest. get_our_node_id( ) == * node_id &&
371
- ( ( $node != 0 && idx != 0 ) || !chan_a_disconnected) &&
372
- ( ( $node != 2 && idx != 2 ) || !chan_b_disconnected) {
463
+ for dest in nodes. iter( ) {
464
+ if dest. get_our_node_id( ) == * node_id {
373
465
assert!( update_fee. is_none( ) ) ;
374
466
for update_add in update_add_htlcs {
375
467
if !$corrupt_forward {
@@ -399,25 +491,16 @@ pub fn do_test(data: &[u8]) {
399
491
}
400
492
} ,
401
493
events:: MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
402
- for ( idx, dest) in nodes. iter( ) . enumerate( ) {
403
- if dest. get_our_node_id( ) == * node_id &&
404
- ( ( $node != 0 && idx != 0 ) || !chan_a_disconnected) &&
405
- ( ( $node != 2 && idx != 2 ) || !chan_b_disconnected) {
494
+ for dest in nodes. iter( ) {
495
+ if dest. get_our_node_id( ) == * node_id {
406
496
test_err!( dest. handle_revoke_and_ack( & nodes[ $node] . get_our_node_id( ) , msg) ) ;
407
497
}
408
498
}
409
499
} ,
410
500
events:: MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
411
- for ( idx , dest) in nodes. iter( ) . enumerate ( ) {
501
+ for dest in nodes. iter( ) {
412
502
if dest. get_our_node_id( ) == * node_id {
413
503
test_err!( dest. handle_channel_reestablish( & nodes[ $node] . get_our_node_id( ) , msg) ) ;
414
- if $node == 0 || idx == 0 {
415
- chan_a_reconnecting = false ;
416
- chan_a_disconnected = false ;
417
- } else {
418
- chan_b_reconnecting = false ;
419
- chan_b_disconnected = false ;
420
- }
421
504
}
422
505
}
423
506
} ,
@@ -434,6 +517,56 @@ pub fn do_test(data: &[u8]) {
434
517
} }
435
518
}
436
519
520
+ macro_rules! drain_msg_events_on_disconnect {
521
+ ( $counterparty_id: expr) => { {
522
+ if $counterparty_id == 0 {
523
+ for event in nodes[ 0 ] . get_and_clear_pending_msg_events( ) {
524
+ match event {
525
+ events:: MessageSendEvent :: UpdateHTLCs { .. } => { } ,
526
+ events:: MessageSendEvent :: SendRevokeAndACK { .. } => { } ,
527
+ events:: MessageSendEvent :: SendChannelReestablish { .. } => { } ,
528
+ events:: MessageSendEvent :: SendFundingLocked { .. } => { } ,
529
+ events:: MessageSendEvent :: PaymentFailureNetworkUpdate { .. } => { } ,
530
+ _ => panic!( "Unhandled message event" ) ,
531
+ }
532
+ }
533
+ ba_events. clear( ) ;
534
+ } else {
535
+ for event in nodes[ 2 ] . get_and_clear_pending_msg_events( ) {
536
+ match event {
537
+ events:: MessageSendEvent :: UpdateHTLCs { .. } => { } ,
538
+ events:: MessageSendEvent :: SendRevokeAndACK { .. } => { } ,
539
+ events:: MessageSendEvent :: SendChannelReestablish { .. } => { } ,
540
+ events:: MessageSendEvent :: SendFundingLocked { .. } => { } ,
541
+ events:: MessageSendEvent :: PaymentFailureNetworkUpdate { .. } => { } ,
542
+ _ => panic!( "Unhandled message event" ) ,
543
+ }
544
+ }
545
+ bc_events. clear( ) ;
546
+ }
547
+ let mut events = nodes[ 1 ] . get_and_clear_pending_msg_events( ) ;
548
+ let drop_node_id = if $counterparty_id == 0 { nodes[ 0 ] . get_our_node_id( ) } else { nodes[ 2 ] . get_our_node_id( ) } ;
549
+ let msg_sink = if $counterparty_id == 0 { & mut bc_events } else { & mut ba_events } ;
550
+ for event in events. drain( ..) {
551
+ let push = match event {
552
+ events:: MessageSendEvent :: UpdateHTLCs { ref node_id, .. } => {
553
+ if * node_id != drop_node_id { true } else { false }
554
+ } ,
555
+ events:: MessageSendEvent :: SendRevokeAndACK { ref node_id, .. } => {
556
+ if * node_id != drop_node_id { true } else { false }
557
+ } ,
558
+ events:: MessageSendEvent :: SendChannelReestablish { ref node_id, .. } => {
559
+ if * node_id != drop_node_id { true } else { false }
560
+ } ,
561
+ events:: MessageSendEvent :: SendFundingLocked { .. } => false ,
562
+ events:: MessageSendEvent :: PaymentFailureNetworkUpdate { .. } => false ,
563
+ _ => panic!( "Unhandled message event" ) ,
564
+ } ;
565
+ if push { msg_sink. push( event) ; }
566
+ }
567
+ } }
568
+ }
569
+
437
570
macro_rules! process_events {
438
571
( $node: expr, $fail: expr) => { {
439
572
// In case we get 256 payments we may have a hash collision, resulting in the
@@ -500,27 +633,29 @@ pub fn do_test(data: &[u8]) {
500
633
nodes[ 0 ] . peer_disconnected ( & nodes[ 1 ] . get_our_node_id ( ) , false ) ;
501
634
nodes[ 1 ] . peer_disconnected ( & nodes[ 0 ] . get_our_node_id ( ) , false ) ;
502
635
chan_a_disconnected = true ;
636
+ drain_msg_events_on_disconnect ! ( 0 ) ;
503
637
}
504
638
} ,
505
639
0x10 => {
506
640
if !chan_b_disconnected {
507
641
nodes[ 1 ] . peer_disconnected ( & nodes[ 2 ] . get_our_node_id ( ) , false ) ;
508
642
nodes[ 2 ] . peer_disconnected ( & nodes[ 1 ] . get_our_node_id ( ) , false ) ;
509
643
chan_b_disconnected = true ;
644
+ drain_msg_events_on_disconnect ! ( 2 ) ;
510
645
}
511
646
} ,
512
647
0x11 => {
513
- if chan_a_disconnected && !chan_a_reconnecting {
648
+ if chan_a_disconnected {
514
649
nodes[ 0 ] . peer_connected ( & nodes[ 1 ] . get_our_node_id ( ) ) ;
515
650
nodes[ 1 ] . peer_connected ( & nodes[ 0 ] . get_our_node_id ( ) ) ;
516
- chan_a_reconnecting = true ;
651
+ chan_a_disconnected = false ;
517
652
}
518
653
} ,
519
654
0x12 => {
520
- if chan_b_disconnected && !chan_b_reconnecting {
655
+ if chan_b_disconnected {
521
656
nodes[ 1 ] . peer_connected ( & nodes[ 2 ] . get_our_node_id ( ) ) ;
522
657
nodes[ 2 ] . peer_connected ( & nodes[ 1 ] . get_our_node_id ( ) ) ;
523
- chan_b_reconnecting = true ;
658
+ chan_b_disconnected = false ;
524
659
}
525
660
} ,
526
661
0x13 => process_msg_events ! ( 0 , true ) ,
@@ -535,8 +670,67 @@ pub fn do_test(data: &[u8]) {
535
670
0x1c => process_msg_events ! ( 2 , false ) ,
536
671
0x1d => process_events ! ( 2 , true ) ,
537
672
0x1e => process_events ! ( 2 , false ) ,
673
+ 0x1f => {
674
+ if !chan_a_disconnected {
675
+ nodes[ 1 ] . peer_disconnected ( & nodes[ 0 ] . get_our_node_id ( ) , false ) ;
676
+ chan_a_disconnected = true ;
677
+ drain_msg_events_on_disconnect ! ( 0 ) ;
678
+ }
679
+ let ( new_node_a, new_monitor_a) = reload_node ! ( node_a_ser, 0 , monitor_a) ;
680
+ node_a = Arc :: new ( new_node_a) ;
681
+ nodes[ 0 ] = node_a. clone ( ) ;
682
+ monitor_a = new_monitor_a;
683
+ } ,
684
+ 0x20 => {
685
+ if !chan_a_disconnected {
686
+ nodes[ 0 ] . peer_disconnected ( & nodes[ 1 ] . get_our_node_id ( ) , false ) ;
687
+ chan_a_disconnected = true ;
688
+ nodes[ 0 ] . get_and_clear_pending_msg_events ( ) ;
689
+ ba_events. clear ( ) ;
690
+ }
691
+ if !chan_b_disconnected {
692
+ nodes[ 2 ] . peer_disconnected ( & nodes[ 1 ] . get_our_node_id ( ) , false ) ;
693
+ chan_b_disconnected = true ;
694
+ nodes[ 2 ] . get_and_clear_pending_msg_events ( ) ;
695
+ bc_events. clear ( ) ;
696
+ }
697
+ let ( new_node_b, new_monitor_b) = reload_node ! ( node_b_ser, 1 , monitor_b) ;
698
+ node_b = Arc :: new ( new_node_b) ;
699
+ nodes[ 1 ] = node_b. clone ( ) ;
700
+ monitor_b = new_monitor_b;
701
+ } ,
702
+ 0x21 => {
703
+ if !chan_b_disconnected {
704
+ nodes[ 1 ] . peer_disconnected ( & nodes[ 2 ] . get_our_node_id ( ) , false ) ;
705
+ chan_b_disconnected = true ;
706
+ drain_msg_events_on_disconnect ! ( 2 ) ;
707
+ }
708
+ let ( new_node_c, new_monitor_c) = reload_node ! ( node_c_ser, 2 , monitor_c) ;
709
+ node_c = Arc :: new ( new_node_c) ;
710
+ nodes[ 2 ] = node_c. clone ( ) ;
711
+ monitor_c = new_monitor_c;
712
+ } ,
538
713
_ => test_return ! ( ) ,
539
714
}
715
+
716
+ if monitor_a. should_update_manager . load ( atomic:: Ordering :: Relaxed ) {
717
+ node_a_ser. 0 . clear ( ) ;
718
+ nodes[ 0 ] . write ( & mut node_a_ser) . unwrap ( ) ;
719
+ monitor_a. should_update_manager . store ( false , atomic:: Ordering :: Relaxed ) ;
720
+ * monitor_a. latest_updates_good_at_last_ser . lock ( ) . unwrap ( ) = monitor_a. latest_update_good . lock ( ) . unwrap ( ) . clone ( ) ;
721
+ }
722
+ if monitor_b. should_update_manager . load ( atomic:: Ordering :: Relaxed ) {
723
+ node_b_ser. 0 . clear ( ) ;
724
+ nodes[ 1 ] . write ( & mut node_b_ser) . unwrap ( ) ;
725
+ monitor_b. should_update_manager . store ( false , atomic:: Ordering :: Relaxed ) ;
726
+ * monitor_b. latest_updates_good_at_last_ser . lock ( ) . unwrap ( ) = monitor_b. latest_update_good . lock ( ) . unwrap ( ) . clone ( ) ;
727
+ }
728
+ if monitor_c. should_update_manager . load ( atomic:: Ordering :: Relaxed ) {
729
+ node_c_ser. 0 . clear ( ) ;
730
+ nodes[ 2 ] . write ( & mut node_c_ser) . unwrap ( ) ;
731
+ monitor_c. should_update_manager . store ( false , atomic:: Ordering :: Relaxed ) ;
732
+ * monitor_c. latest_updates_good_at_last_ser . lock ( ) . unwrap ( ) = monitor_c. latest_update_good . lock ( ) . unwrap ( ) . clone ( ) ;
733
+ }
540
734
}
541
735
}
542
736
0 commit comments