@@ -14,6 +14,7 @@ use lightning::chain::keysinterface::{Sign, KeysInterface};
14
14
use lightning:: ln:: channelmanager:: ChannelManager ;
15
15
use lightning:: ln:: msgs:: { ChannelMessageHandler , RoutingMessageHandler } ;
16
16
use lightning:: ln:: peer_handler:: { PeerManager , SocketDescriptor } ;
17
+ use lightning:: util:: events:: { EventHandler , EventsProvider } ;
17
18
use lightning:: util:: logger:: Logger ;
18
19
use std:: sync:: Arc ;
19
20
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -109,11 +110,12 @@ impl BackgroundProcessor {
109
110
Descriptor : ' static + SocketDescriptor + Send + Sync ,
110
111
CMH : ' static + Deref + Send + Sync ,
111
112
RMH : ' static + Deref + Send + Sync ,
113
+ EH : ' static + EventHandler + Send + Sync ,
112
114
CMP : ' static + Send + ChannelManagerPersister < Signer , M , T , K , F , L > ,
113
115
CM : ' static + Deref < Target = ChannelManager < Signer , M , T , K , F , L > > + Send + Sync ,
114
116
PM : ' static + Deref < Target = PeerManager < Descriptor , CMH , RMH , L > > + Send + Sync ,
115
117
>
116
- ( handler : CMP , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
118
+ ( persister : CMP , event_handler : EH , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
117
119
where
118
120
M :: Target : ' static + chain:: Watch < Signer > ,
119
121
T :: Target : ' static + BroadcasterInterface ,
@@ -129,10 +131,11 @@ impl BackgroundProcessor {
129
131
let mut current_time = Instant :: now ( ) ;
130
132
loop {
131
133
peer_manager. process_events ( ) ;
134
+ channel_manager. process_pending_events ( & event_handler) ;
132
135
let updates_available =
133
136
channel_manager. await_persistable_update_timeout ( Duration :: from_millis ( 100 ) ) ;
134
137
if updates_available {
135
- handler . persist_manager ( & * channel_manager) ?;
138
+ persister . persist_manager ( & * channel_manager) ?;
136
139
}
137
140
// Exit the loop if the background processor was requested to stop.
138
141
if stop_thread. load ( Ordering :: Acquire ) == true {
@@ -162,10 +165,8 @@ mod tests {
162
165
use bitcoin:: blockdata:: constants:: genesis_block;
163
166
use bitcoin:: blockdata:: transaction:: { Transaction , TxOut } ;
164
167
use bitcoin:: network:: constants:: Network ;
165
- use lightning:: chain;
166
- use lightning:: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
167
168
use lightning:: chain:: chainmonitor;
168
- use lightning:: chain:: keysinterface:: { Sign , InMemorySigner , KeysInterface , KeysManager } ;
169
+ use lightning:: chain:: keysinterface:: { InMemorySigner , KeysInterface , KeysManager } ;
169
170
use lightning:: chain:: transaction:: OutPoint ;
170
171
use lightning:: get_event_msg;
171
172
use lightning:: ln:: channelmanager:: { BestBlock , ChainParameters , ChannelManager , SimpleArcChannelManager } ;
@@ -174,7 +175,6 @@ mod tests {
174
175
use lightning:: ln:: peer_handler:: { PeerManager , MessageHandler , SocketDescriptor } ;
175
176
use lightning:: util:: config:: UserConfig ;
176
177
use lightning:: util:: events:: { Event , MessageSendEventsProvider , MessageSendEvent } ;
177
- use lightning:: util:: logger:: Logger ;
178
178
use lightning:: util:: ser:: Writeable ;
179
179
use lightning:: util:: test_utils;
180
180
use lightning_persister:: FilesystemPersister ;
@@ -279,14 +279,15 @@ mod tests {
279
279
// re-persistence and is successfully re-persisted.
280
280
let nodes = create_nodes ( 2 , "test_background_processor" . to_string ( ) ) ;
281
281
282
- // Initiate the background processors to watch each node.
283
- let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
284
- let callback = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
285
- let bg_processor = BackgroundProcessor :: start ( callback, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
286
-
287
282
// Go through the channel creation process until each node should have something persisted.
288
283
let tx = open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
289
284
285
+ // Initiate the background processors to watch each node.
286
+ let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
287
+ let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
288
+ let event_handler = |_| { } ;
289
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
290
+
290
291
macro_rules! check_persisted_data {
291
292
( $node: expr, $filepath: expr, $expected_bytes: expr) => {
292
293
match $node. write( & mut $expected_bytes) {
@@ -336,8 +337,9 @@ mod tests {
336
337
// `FRESHNESS_TIMER`.
337
338
let nodes = create_nodes ( 1 , "test_timer_tick_called" . to_string ( ) ) ;
338
339
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
339
- let callback = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
340
- let bg_processor = BackgroundProcessor :: start ( callback, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
340
+ let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
341
+ let event_handler = |_| { } ;
342
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
341
343
loop {
342
344
let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
343
345
let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred" . to_string ( ) ;
@@ -352,21 +354,12 @@ mod tests {
352
354
#[ test]
353
355
fn test_persist_error ( ) {
354
356
// Test that if we encounter an error during manager persistence, the thread panics.
355
- fn persist_manager < Signer , M , T , K , F , L > ( _data : & ChannelManager < Signer , Arc < M > , Arc < T > , Arc < K > , Arc < F > , Arc < L > > ) -> Result < ( ) , std:: io:: Error >
356
- where Signer : ' static + Sign ,
357
- M : ' static + chain:: Watch < Signer > ,
358
- T : ' static + BroadcasterInterface ,
359
- K : ' static + KeysInterface < Signer =Signer > ,
360
- F : ' static + FeeEstimator ,
361
- L : ' static + Logger ,
362
- {
363
- Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "test" ) )
364
- }
365
-
366
357
let nodes = create_nodes ( 2 , "test_persist_error" . to_string ( ) ) ;
367
- let bg_processor = BackgroundProcessor :: start ( persist_manager, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
368
358
open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
369
359
360
+ let persister = |_: & _ | Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "test" ) ) ;
361
+ let event_handler = |_| { } ;
362
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
370
363
let _ = bg_processor. thread_handle . join ( ) . unwrap ( ) . expect_err ( "Errored persisting manager: test" ) ;
371
364
}
372
365
}
0 commit comments