@@ -32,6 +32,7 @@ use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
32
32
use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs } ;
33
33
use chain:: transaction:: { OutPoint , TransactionData } ;
34
34
use chain:: keysinterface:: Sign ;
35
+ use util:: atomic_counter:: AtomicCounter ;
35
36
use util:: logger:: Logger ;
36
37
use util:: errors:: APIError ;
37
38
use util:: events;
@@ -41,10 +42,19 @@ use ln::channelmanager::ChannelDetails;
41
42
use prelude:: * ;
42
43
use sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
43
44
use core:: ops:: Deref ;
45
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
44
46
45
47
#[ derive( Clone , Copy , Hash , PartialEq , Eq ) ]
48
+ /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
49
+ /// entirely opaque.
46
50
enum UpdateOrigin {
51
+ /// An update that was generated by the `ChannelManager` (via our `chain::Watch`
52
+ /// implementation). This corresponds to an actual [`ChannelMonitorUpdate::update_id`] field
53
+ /// and [`ChannelMonitor::get_latest_update_id`].
47
54
OffChain ( u64 ) ,
55
+ /// An update that was generated during blockchain processing. The ID here is specific to the
56
+ /// generating [`ChainMonitor`] and does *not* correspond to any on-disk IDs.
57
+ ChainSync ( u64 ) ,
48
58
}
49
59
50
60
/// An opaque identifier describing a specific [`Persist`] method call.
@@ -103,6 +113,12 @@ pub trait Persist<ChannelSigner: Sign> {
103
113
/// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more
104
114
/// details.
105
115
///
116
+ /// During blockchain synchronization operations, this may be called with no
117
+ /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
118
+ /// Note that after the full [`ChannelMonitor`] is persisted any previous
119
+ /// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be
120
+ /// applied to the persisted [`ChannelMonitor`] as they were already applied.
121
+ ///
106
122
/// If an implementer chooses to persist the updates only, they need to make
107
123
/// sure that all the updates are applied to the `ChannelMonitors` *before*
108
124
/// the set of channel monitors is given to the `ChannelManager`
@@ -123,7 +139,7 @@ pub trait Persist<ChannelSigner: Sign> {
123
139
/// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
124
140
///
125
141
/// [`Writeable::write`]: crate::util::ser::Writeable::write
126
- fn update_persisted_channel ( & self , channel_id : OutPoint , update : & ChannelMonitorUpdate , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
142
+ fn update_persisted_channel ( & self , channel_id : OutPoint , update : & Option < ChannelMonitorUpdate > , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
127
143
}
128
144
129
145
struct MonitorHolder < ChannelSigner : Sign > {
@@ -134,14 +150,35 @@ struct MonitorHolder<ChannelSigner: Sign> {
134
150
/// update_persisted_channel, the user returns a TemporaryFailure, and then calls
135
151
/// channel_monitor_updated immediately, racing our insertion of the pending update into the
136
152
/// contained Vec.
153
+ ///
154
+ /// Beyond the synchronization of updates themselves, we cannot handle user events until after
155
+ /// any chain updates have been stored on disk. Thus, we scan this list when returning updates
156
+ /// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still
157
+ /// being persisted fully to disk after a chain update.
158
+ ///
159
+ /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
160
+ /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
161
+ /// the pending payment entry, and then reloading before the monitor is persisted, resulting in
162
+ /// the ChannelManager re-adding the same payment entry, before the same block is replayed,
163
+ /// resulting in a duplicate PaymentSent event.
137
164
pending_monitor_updates : Mutex < Vec < MonitorUpdateId > > ,
165
+ /// When the user returns a PermanentFailure error from an update_persisted_channel call during
166
+ /// block processing, we inform the ChannelManager that the channel should be closed
167
+ /// asynchronously. In order to ensure no further changes happen before the ChannelManager has
168
+ /// processed the closure event, we set this to true and return PermanentFailure for any other
169
+ /// chain::Watch events.
170
+ channel_perm_failed : AtomicBool ,
138
171
}
139
172
140
173
impl < ChannelSigner : Sign > MonitorHolder < ChannelSigner > {
141
174
fn has_pending_offchain_updates ( & self , pending_monitor_updates_lock : & MutexGuard < Vec < MonitorUpdateId > > ) -> bool {
142
175
pending_monitor_updates_lock. iter ( ) . any ( |update_id|
143
176
if let UpdateOrigin :: OffChain ( _) = update_id. contents { true } else { false } )
144
177
}
178
+ fn has_pending_chainsync_updates ( & self , pending_monitor_updates_lock : & MutexGuard < Vec < MonitorUpdateId > > ) -> bool {
179
+ pending_monitor_updates_lock. iter ( ) . any ( |update_id|
180
+ if let UpdateOrigin :: ChainSync ( _) = update_id. contents { true } else { false } )
181
+ }
145
182
}
146
183
147
184
/// A read-only reference to a current ChannelMonitor.
@@ -177,11 +214,17 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
177
214
P :: Target : Persist < ChannelSigner > ,
178
215
{
179
216
monitors : RwLock < HashMap < OutPoint , MonitorHolder < ChannelSigner > > > ,
217
+ /// When we generate a [`MonitorUpdateId`] for a chain-event monitor persistence, we need a
218
+ /// unique ID, which we calculate by simply getting the next value from this counter. Note that
219
+ /// the ID is never persisted so it's ok that they reset on restart.
220
+ sync_persistence_id : AtomicCounter ,
180
221
chain_source : Option < C > ,
181
222
broadcaster : T ,
182
223
logger : L ,
183
224
fee_estimator : F ,
184
225
persister : P ,
226
+ /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
227
+ /// from the user and not from a [`ChannelMonitor`].
185
228
pending_monitor_events : Mutex < Vec < MonitorEvent > > ,
186
229
}
187
230
@@ -206,26 +249,50 @@ where C::Target: chain::Filter,
206
249
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
207
250
{
208
251
let mut dependent_txdata = Vec :: new ( ) ;
209
- let monitor_states = self . monitors . read ( ) . unwrap ( ) ;
210
- for monitor_state in monitor_states. values ( ) {
211
- let mut txn_outputs = process ( & monitor_state. monitor , txdata) ;
252
+ {
253
+ let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
254
+ for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
255
+ let monitor = & monitor_state. monitor ;
256
+ let mut txn_outputs;
257
+ {
258
+ txn_outputs = process ( monitor, txdata) ;
259
+ let update_id = MonitorUpdateId {
260
+ contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
261
+ } ;
262
+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
263
+
264
+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
265
+ match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
266
+ Ok ( ( ) ) =>
267
+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
268
+ Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
269
+ monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
270
+ self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateFailed ( * funding_outpoint) ) ;
271
+ } ,
272
+ Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
273
+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
274
+ pending_monitor_updates. push ( update_id) ;
275
+ } ,
276
+ }
277
+ }
212
278
213
- // Register any new outputs with the chain source for filtering, storing any dependent
214
- // transactions from within the block that previously had not been included in txdata.
215
- if let Some ( ref chain_source) = self . chain_source {
216
- let block_hash = header. block_hash ( ) ;
217
- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
218
- for ( idx, output) in outputs. drain ( ..) {
219
- // Register any new outputs with the chain source for filtering and recurse
220
- // if it indicates that there are dependent transactions within the block
221
- // that had not been previously included in txdata.
222
- let output = WatchedOutput {
223
- block_hash : Some ( block_hash) ,
224
- outpoint : OutPoint { txid, index : idx as u16 } ,
225
- script_pubkey : output. script_pubkey ,
226
- } ;
227
- if let Some ( tx) = chain_source. register_output ( output) {
228
- dependent_txdata. push ( tx) ;
279
+ // Register any new outputs with the chain source for filtering, storing any dependent
280
+ // transactions from within the block that previously had not been included in txdata.
281
+ if let Some ( ref chain_source) = self . chain_source {
282
+ let block_hash = header. block_hash ( ) ;
283
+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
284
+ for ( idx, output) in outputs. drain ( ..) {
285
+ // Register any new outputs with the chain source for filtering and recurse
286
+ // if it indicates that there are dependent transactions within the block
287
+ // that had not been previously included in txdata.
288
+ let output = WatchedOutput {
289
+ block_hash : Some ( block_hash) ,
290
+ outpoint : OutPoint { txid, index : idx as u16 } ,
291
+ script_pubkey : output. script_pubkey ,
292
+ } ;
293
+ if let Some ( tx) = chain_source. register_output ( output) {
294
+ dependent_txdata. push ( tx) ;
295
+ }
229
296
}
230
297
}
231
298
}
@@ -251,6 +318,7 @@ where C::Target: chain::Filter,
251
318
pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ) -> Self {
252
319
Self {
253
320
monitors : RwLock :: new ( HashMap :: new ( ) ) ,
321
+ sync_persistence_id : AtomicCounter :: new ( ) ,
254
322
chain_source,
255
323
broadcaster,
256
324
logger,
@@ -337,7 +405,7 @@ where C::Target: chain::Filter,
337
405
pending_monitor_updates. retain ( |update_id| * update_id != completed_update_id) ;
338
406
339
407
match completed_update_id {
340
- MonitorUpdateId { .. } => {
408
+ MonitorUpdateId { contents : UpdateOrigin :: OffChain ( _ ) } => {
341
409
// Note that we only check for `UpdateOrigin::OffChain` failures here - if
342
410
// we're being told that a `UpdateOrigin::OffChain` monitor update completed,
343
411
// we only care about ensuring we don't tell the `ChannelManager` to restore
@@ -348,16 +416,22 @@ where C::Target: chain::Filter,
348
416
// `MonitorEvent`s from the monitor back to the `ChannelManager` until they
349
417
// complete.
350
418
let monitor_is_pending_updates = monitor_data. has_pending_offchain_updates ( & pending_monitor_updates) ;
351
- if monitor_is_pending_updates {
352
- // If there are still monitor updates pending, we cannot yet construct an
419
+ if monitor_is_pending_updates || monitor_data. channel_perm_failed . load ( Ordering :: Acquire ) {
420
+ // If there are still monitor updates pending (or an old monitor update
421
+ // finished after a later one perm-failed), we cannot yet construct an
353
422
// UpdateCompleted event.
354
423
return Ok ( ( ) ) ;
355
424
}
356
425
self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateCompleted {
357
426
funding_txo,
358
427
monitor_update_id : monitor_data. monitor . get_latest_update_id ( ) ,
359
428
} ) ;
360
- }
429
+ } ,
430
+ MonitorUpdateId { contents : UpdateOrigin :: ChainSync ( _) } => {
431
+ // We've already done everything we need to, the next time
432
+ // release_pending_monitor_events is called, any events for this ChannelMonitor
433
+ // will be returned if there's no more SyncPersistId events left.
434
+ } ,
361
435
}
362
436
Ok ( ( ) )
363
437
}
@@ -502,7 +576,11 @@ where C::Target: chain::Filter,
502
576
monitor. load_outputs_to_watch ( chain_source) ;
503
577
}
504
578
}
505
- entry. insert ( MonitorHolder { monitor, pending_monitor_updates : Mutex :: new ( pending_monitor_updates) } ) ;
579
+ entry. insert ( MonitorHolder {
580
+ monitor,
581
+ pending_monitor_updates : Mutex :: new ( pending_monitor_updates) ,
582
+ channel_perm_failed : AtomicBool :: new ( false ) ,
583
+ } ) ;
506
584
persist_res
507
585
}
508
586
@@ -534,15 +612,19 @@ where C::Target: chain::Filter,
534
612
// still be changed. So, persist the updated monitor despite the error.
535
613
let update_id = MonitorUpdateId :: from_monitor_update ( & update) ;
536
614
let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
537
- let persist_res = self . persister . update_persisted_channel ( funding_txo, & update, monitor, update_id) ;
615
+ let persist_res = self . persister . update_persisted_channel ( funding_txo, & Some ( update) , monitor, update_id) ;
538
616
if let Err ( e) = persist_res {
539
617
if e == ChannelMonitorUpdateErr :: TemporaryFailure {
540
618
pending_monitor_updates. push ( update_id) ;
619
+ } else {
620
+ monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
541
621
}
542
622
log_error ! ( self . logger, "Failed to persist channel monitor update: {:?}" , e) ;
543
623
}
544
624
if update_res. is_err ( ) {
545
625
Err ( ChannelMonitorUpdateErr :: PermanentFailure )
626
+ } else if monitor_state. channel_perm_failed . load ( Ordering :: Acquire ) {
627
+ Err ( ChannelMonitorUpdateErr :: PermanentFailure )
546
628
} else {
547
629
persist_res
548
630
}
@@ -553,7 +635,23 @@ where C::Target: chain::Filter,
553
635
fn release_pending_monitor_events ( & self ) -> Vec < MonitorEvent > {
554
636
let mut pending_monitor_events = self . pending_monitor_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
555
637
for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
556
- pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
638
+ let is_pending_monitor_update = monitor_state. has_pending_chainsync_updates ( & monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ) ;
639
+ if is_pending_monitor_update {
640
+ log_info ! ( self . logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!" ) ;
641
+ } else {
642
+ if monitor_state. channel_perm_failed . load ( Ordering :: Acquire ) {
643
+ // If a `UpdateOrigin::ChainSync` persistence failed with `PermanantFailure`,
644
+ // we don't really know if the latest `ChannelMonitor` state is on disk or not.
645
+ // We're supposed to hold monitor updates until the latest state is on disk to
646
+ // avoid duplicate events, but the user told us persistence is screw-y and may
647
+ // not complete. We can't hold events forever because we may learn some payment
648
+ // preimage, so instead we just log and hope the user complied with the
649
+ // `PermanentFailure` requirements of having at least the local-disk copy
650
+ // updated.
651
+ log_info ! ( self . logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!" ) ;
652
+ }
653
+ pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
654
+ }
557
655
}
558
656
pending_monitor_events
559
657
}
0 commit comments