@@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash};
29
29
use crate :: chain;
30
30
use crate :: chain:: { ChannelMonitorUpdateStatus , Filter , WatchedOutput } ;
31
31
use crate :: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
32
- use crate :: chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs , WithChannelMonitor } ;
32
+ use crate :: chain:: channelmonitor:: { ChannelMonitor , StubChannel , StubChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs , WithChannelMonitor } ;
33
33
use crate :: chain:: transaction:: { OutPoint , TransactionData } ;
34
34
use crate :: ln:: types:: ChannelId ;
35
35
use crate :: sign:: ecdsa:: EcdsaChannelSigner ;
@@ -163,6 +163,9 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
163
163
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
164
164
/// hedging against data loss in case of unexpected failure.
165
165
fn archive_persisted_channel ( & self , channel_funding_outpoint : OutPoint ) ;
166
+ /// Persist a new channel's data in response to a [`chain::Watch::watch_dummy`] call. This is
167
+ /// called by [`ChannelManager`] for new stub channels received from peer storage backup,
168
+ fn persist_new_stub_channel ( & self , funding_txo : OutPoint , stub_monitor : & StubChannelMonitor < ChannelSigner > ) -> Result < ( ) , bitcoin:: io:: Error > ;
166
169
}
167
170
168
171
struct MonitorHolder < ChannelSigner : EcdsaChannelSigner > {
@@ -199,6 +202,19 @@ pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
199
202
funding_txo : OutPoint ,
200
203
}
201
204
205
+ /// A read-only reference to a current StubChannelMonitors similar to [LockedChannelMonitor]
206
+ pub struct LockedStubChannelMonitor < ' a , ChannelSigner : EcdsaChannelSigner > {
207
+ lock : RwLockReadGuard < ' a , HashMap < OutPoint , StubChannelMonitor < ChannelSigner > > > ,
208
+ funding_txo : OutPoint ,
209
+ }
210
+
211
+ impl < ChannelSigner : EcdsaChannelSigner > Deref for LockedStubChannelMonitor < ' _ , ChannelSigner > {
212
+ type Target = StubChannelMonitor < ChannelSigner > ;
213
+ fn deref ( & self ) -> & StubChannelMonitor < ChannelSigner > {
214
+ & self . lock . get ( & self . funding_txo ) . expect ( "Checked at construction" )
215
+ }
216
+ }
217
+
202
218
impl < ChannelSigner : EcdsaChannelSigner > Deref for LockedChannelMonitor < ' _ , ChannelSigner > {
203
219
type Target = ChannelMonitor < ChannelSigner > ;
204
220
fn deref ( & self ) -> & ChannelMonitor < ChannelSigner > {
@@ -230,6 +246,8 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
230
246
P :: Target : Persist < ChannelSigner > ,
231
247
{
232
248
monitors : RwLock < HashMap < OutPoint , MonitorHolder < ChannelSigner > > > ,
249
+ stub_monitors : RwLock < HashMap < OutPoint , StubChannelMonitor < ChannelSigner > > > ,
250
+
233
251
chain_source : Option < C > ,
234
252
broadcaster : T ,
235
253
logger : L ,
@@ -264,9 +282,10 @@ where C::Target: chain::Filter,
264
282
/// updated `txdata`.
265
283
///
266
284
/// Calls which represent a new blockchain tip height should set `best_height`.
267
- fn process_chain_data < FN > ( & self , header : & Header , best_height : Option < u32 > , txdata : & TransactionData , process : FN )
285
+ fn process_chain_data < FN , SN > ( & self , header : & Header , best_height : Option < u32 > , txdata : & TransactionData , process : FN , stub_process : SN )
268
286
where
269
- FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
287
+ FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > ,
288
+ SN : Fn ( & StubChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
270
289
{
271
290
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
272
291
let funding_outpoints = hash_set_from_iter ( self . monitors . read ( ) . unwrap ( ) . keys ( ) . cloned ( ) ) ;
@@ -296,6 +315,14 @@ where C::Target: chain::Filter,
296
315
}
297
316
}
298
317
318
+ let stub_monitors = self . stub_monitors . write ( ) . unwrap ( ) ;
319
+ for ( funding_outpoint, stub_monitor) in stub_monitors. iter ( ) {
320
+ if self . update_stub_with_chain_data ( header, best_height, txdata, & stub_process, funding_outpoint, stub_monitor) . is_err ( ) {
321
+ log_error ! ( self . logger, "{}" , err_str) ;
322
+ panic ! ( "{}" , err_str) ;
323
+ } ;
324
+ }
325
+
299
326
if let Some ( height) = best_height {
300
327
// If the best block height is being updated, update highest_chain_height under the
301
328
// monitors write lock.
@@ -307,6 +334,34 @@ where C::Target: chain::Filter,
307
334
}
308
335
}
309
336
337
+ fn update_stub_with_chain_data < SN > ( & self , header : & Header , _best_height : Option < u32 > , txdata : & TransactionData , stub_process : SN ,
338
+ _funding_outpoint : & OutPoint , stub_monitor : & StubChannelMonitor < ChannelSigner > ) -> Result < ( ) , ( ) >
339
+ where SN : Fn ( & StubChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
340
+ let logger = WithChannelMonitor :: from_stub ( & self . logger , stub_monitor) ;
341
+ let mut txn_outputs;
342
+ {
343
+ txn_outputs = stub_process ( stub_monitor, txdata) ;
344
+ }
345
+
346
+ if let Some ( ref chain_source) = self . chain_source {
347
+ let block_hash = header. block_hash ( ) ;
348
+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
349
+ for ( idx, output) in outputs. drain ( ..) {
350
+ // Register any new outputs with the chain source for filtering
351
+ let output = WatchedOutput {
352
+ block_hash : Some ( block_hash) ,
353
+ outpoint : OutPoint { txid, index : idx as u16 } ,
354
+ script_pubkey : output. script_pubkey ,
355
+ } ;
356
+ log_trace ! ( logger, "Adding monitoring for spends of outpoint from stub {} to the filter" , output. outpoint) ;
357
+ chain_source. register_output ( output) ;
358
+ }
359
+ }
360
+ }
361
+
362
+ Ok ( ( ) )
363
+ }
364
+
310
365
fn update_monitor_with_chain_data < FN > (
311
366
& self , header : & Header , best_height : Option < u32 > , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint ,
312
367
monitor_state : & MonitorHolder < ChannelSigner > , channel_count : usize ,
@@ -381,6 +436,7 @@ where C::Target: chain::Filter,
381
436
pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ) -> Self {
382
437
Self {
383
438
monitors : RwLock :: new ( new_hash_map ( ) ) ,
439
+ stub_monitors : RwLock :: new ( new_hash_map ( ) ) ,
384
440
chain_source,
385
441
broadcaster,
386
442
logger,
@@ -430,6 +486,20 @@ where C::Target: chain::Filter,
430
486
}
431
487
}
432
488
489
+ /// Gets the [`LockedStubChannelMonitor`] for a given funding outpoint, returning an `Err` if no
490
+ /// such [`StubChannelMonitor`] is currently being monitored for.
491
+ ///
492
+ /// Note that the result holds a mutex over our monitor set, and should not be held
493
+ /// indefinitely.
494
+ pub fn get_stub_monitor ( & self , funding_txo : OutPoint ) -> Result < LockedStubChannelMonitor < ' _ , ChannelSigner > , ( ) > {
495
+ let lock = self . stub_monitors . read ( ) . unwrap ( ) ;
496
+ if lock. get ( & funding_txo) . is_some ( ) {
497
+ Ok ( LockedStubChannelMonitor { lock, funding_txo } )
498
+ } else {
499
+ Err ( ( ) )
500
+ }
501
+ }
502
+
433
503
/// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored.
434
504
///
435
505
/// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
@@ -441,6 +511,17 @@ where C::Target: chain::Filter,
441
511
} ) . collect ( )
442
512
}
443
513
514
+ /// Lists the funding outpoint and channel ID of each [`StubChannelMonitor`] being monitored.
515
+ ///
516
+ /// Note that [`StubChannelMonitor`]s are not removed when a channel is closed as they are always
517
+ /// monitoring for on-chain state resolutions.
518
+ pub fn list_stub_monitors ( & self ) -> Vec < ( OutPoint , ChannelId ) > {
519
+ self . stub_monitors . read ( ) . unwrap ( ) . iter ( ) . map ( |( outpoint, stub_monitor) | {
520
+ let channel_id = stub_monitor. channel_id ( ) ;
521
+ ( * outpoint, channel_id)
522
+ } ) . collect ( )
523
+ }
524
+
444
525
#[ cfg( not( c_bindings) ) ]
445
526
/// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
446
527
/// Each `Vec<u64>` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates
@@ -672,6 +753,9 @@ where
672
753
self . process_chain_data ( header, Some ( height) , & txdata, |monitor, txdata| {
673
754
monitor. block_connected (
674
755
header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & self . logger )
756
+ } , |stub_monitor, txdata| {
757
+ stub_monitor. block_connected (
758
+ header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & self . logger )
675
759
} ) ;
676
760
// Assume we may have some new events and wake the event processor
677
761
self . event_notifier . notify ( ) ;
@@ -701,6 +785,9 @@ where
701
785
self . process_chain_data ( header, None , txdata, |monitor, txdata| {
702
786
monitor. transactions_confirmed (
703
787
header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & self . logger )
788
+ } , |stub_monitor, txdata| {
789
+ stub_monitor. transactions_confirmed (
790
+ header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & self . logger )
704
791
} ) ;
705
792
// Assume we may have some new events and wake the event processor
706
793
self . event_notifier . notify ( ) ;
@@ -723,6 +810,10 @@ where
723
810
monitor. best_block_updated (
724
811
header, height, & * self . broadcaster , & * self . fee_estimator , & self . logger
725
812
)
813
+ } , |stub_monitor, txdata| {
814
+ debug_assert ! ( txdata. is_empty( ) ) ;
815
+ stub_monitor. best_block_updated (
816
+ header, height, & * self . broadcaster , & * self . fee_estimator , & self . logger )
726
817
} ) ;
727
818
// Assume we may have some new events and wake the event processor
728
819
self . event_notifier . notify ( ) ;
@@ -749,6 +840,89 @@ where C::Target: chain::Filter,
749
840
L :: Target : Logger ,
750
841
P :: Target : Persist < ChannelSigner > ,
751
842
{
843
+ fn watch_dummy ( & self , funding_outpoint : OutPoint , stub_monitor : StubChannelMonitor < ChannelSigner > ) -> Result < ( ) , ( ) > {
844
+ let logger = WithChannelMonitor :: from_stub ( & self . logger , & stub_monitor) ;
845
+ let mut stub_monitors = self . stub_monitors . write ( ) . unwrap ( ) ;
846
+ let entry = match stub_monitors. entry ( funding_outpoint) {
847
+ hash_map:: Entry :: Occupied ( _) => {
848
+ log_error ! ( logger, "Failed to add new channel data: channel monitor for given outpoint is already present" ) ;
849
+ return Err ( ( ) ) ;
850
+ } ,
851
+ hash_map:: Entry :: Vacant ( e) => e,
852
+ } ;
853
+ log_trace ! ( logger, "Got new StubChannelMonitor for channel {}" , stub_monitor. channel_id( ) ) ;
854
+
855
+ if let Some ( ref chain_source) = self . chain_source {
856
+ stub_monitor. load_outputs_to_watch ( chain_source , & self . logger ) ;
857
+ }
858
+ entry. insert ( stub_monitor) ;
859
+
860
+ Ok ( ( ) )
861
+ }
862
+
863
+ fn stale_or_missing_channel_monitor ( & self , stub_chan : & StubChannel ) -> bool {
864
+ let monitors = self . monitors . read ( ) . unwrap ( ) ;
865
+
866
+ if let Some ( mon) = monitors. get ( & stub_chan. funding_outpoint ) {
867
+ return stub_chan. get_min_seen_secret ( ) < mon. monitor . get_min_seen_secret ( ) ;
868
+ }
869
+ return true ;
870
+ }
871
+
872
+ fn panic_and_persist_stub_channel ( & self , funding_outpoint : OutPoint , stub_monitor : StubChannelMonitor < ChannelSigner > ) -> Result < ( ) , ( ) > {
873
+ let logger = WithChannelMonitor :: from_stub ( & self . logger , & stub_monitor) ;
874
+ let mut monitors = self . monitors . write ( ) . unwrap ( ) ;
875
+ {
876
+ let mut stub_monitors = self . stub_monitors . write ( ) . unwrap ( ) ;
877
+ match stub_monitors. entry ( funding_outpoint) {
878
+ hash_map:: Entry :: Occupied ( mut stub) => {
879
+ let stored_stub = stub. get_mut ( ) ;
880
+ if stub_monitor. get_min_seen_secret ( ) < stored_stub. get_min_seen_secret ( ) {
881
+ // This would be useful if we received a more recent StubMonitor than stored.
882
+ log_error ! ( logger, "Updating StubChannelMonitor with the latest data." ) ;
883
+ stored_stub. update_from_new_stub_monitor ( & stub_monitor) ;
884
+ }
885
+ log_debug ! ( logger, "StubChannelMonitor already exists." ) ;
886
+ return Ok ( ( ) ) ;
887
+ } ,
888
+ hash_map:: Entry :: Vacant ( _) => { } ,
889
+ } ;
890
+ }
891
+
892
+ match monitors. entry ( funding_outpoint) {
893
+ hash_map:: Entry :: Occupied ( p) => {
894
+ if p. get ( ) . monitor . get_min_seen_secret ( ) > stub_monitor. get_min_seen_secret ( ) {
895
+ let persist_res = self . persister . persist_new_stub_channel ( funding_outpoint, & stub_monitor) ;
896
+ log_debug ! ( logger, "Persisting new StubChannel for ChannelID: {}" , stub_monitor. channel_id( ) ) ;
897
+ if persist_res. is_err ( ) {
898
+ log_error ! ( logger, "Failed to add new dummy channel data" ) ;
899
+ return Err ( ( ) ) ;
900
+ }
901
+ #[ cfg( not( test) ) ]
902
+ panic ! ( "We've lost state for channel {}. Persisting the StubChannelMonitor" , stub_monitor. channel_id( ) ) ;
903
+
904
+ #[ cfg( test) ]
905
+ return self . watch_dummy ( funding_outpoint, stub_monitor) ;
906
+ }
907
+ log_debug ! ( logger, "Skip StubChannelMonitor for {}, an up-to-date ChannelMonitor already exists." , stub_monitor. channel_id( ) ) ;
908
+ return Ok ( ( ) ) ;
909
+ } ,
910
+ hash_map:: Entry :: Vacant ( _) => { } ,
911
+ } ;
912
+
913
+ log_debug ! ( logger, "Persisting new StubChannel for ChannelID: {}" , stub_monitor. channel_id( ) ) ;
914
+ let persist_res = self . persister . persist_new_stub_channel ( funding_outpoint, & stub_monitor) ;
915
+
916
+ if persist_res. is_err ( ) {
917
+ log_error ! ( logger, "Failed to add new dummy channel data" ) ;
918
+ return Err ( ( ) ) ;
919
+ }
920
+ #[ cfg( not( test) ) ]
921
+ panic ! ( "Found a missing channel {}" , stub_monitor. channel_id( ) ) ;
922
+ #[ cfg( test) ]
923
+ return self . watch_dummy ( funding_outpoint, stub_monitor) ;
924
+ }
925
+
752
926
fn watch_channel ( & self , funding_outpoint : OutPoint , monitor : ChannelMonitor < ChannelSigner > ) -> Result < ChannelMonitorUpdateStatus , ( ) > {
753
927
let logger = WithChannelMonitor :: from ( & self . logger , & monitor, None ) ;
754
928
let mut monitors = self . monitors . write ( ) . unwrap ( ) ;
0 commit comments