@@ -854,14 +854,24 @@ where
854
854
event_handler ( event) . await
855
855
} )
856
856
} ;
857
+ // We should extract these out of config because the macro expects individual arguments
858
+ let persister = config. persister ;
859
+ let chain_monitor = config. chain_monitor ;
860
+ let channel_manager = config. channel_manager ;
861
+ let onion_messenger = config. onion_messenger ;
862
+ let peer_manager = config. peer_manager ;
863
+ let gossip_sync = config. gossip_sync ;
864
+ let logger = config. logger ;
865
+ let scorer = config. scorer ;
866
+
857
867
define_run_body ! (
858
- config . persister,
859
- config . chain_monitor,
860
- config . chain_monitor. process_pending_events_async( async_event_handler) . await ,
861
- config . channel_manager,
862
- config . channel_manager. get_cm( ) . process_pending_events_async( async_event_handler) . await ,
863
- config . onion_messenger,
864
- if let Some ( om) = & config . onion_messenger {
868
+ persister,
869
+ chain_monitor,
870
+ chain_monitor. process_pending_events_async( async_event_handler) . await ,
871
+ channel_manager,
872
+ channel_manager. get_cm( ) . process_pending_events_async( async_event_handler) . await ,
873
+ onion_messenger,
874
+ if let Some ( om) = & onion_messenger {
865
875
om. get_om( ) . process_pending_events_async( async_event_handler) . await
866
876
} ,
867
877
peer_manager,
@@ -875,7 +885,7 @@ where
875
885
scorer,
876
886
should_break,
877
887
{
878
- let om_fut = if let Some ( om) = config . onion_messenger. as_ref( ) {
888
+ let om_fut = if let Some ( om) = onion_messenger. as_ref( ) {
879
889
let fut = om. get_om( ) . get_update_future( ) ;
880
890
OptionalSelector { optional_future: Some ( fut) }
881
891
} else {
@@ -888,8 +898,8 @@ where
888
898
OptionalSelector { optional_future: None }
889
899
} ;
890
900
let fut = Selector {
891
- a: config . channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
892
- b: config . chain_monitor. get_update_future( ) ,
901
+ a: channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
902
+ b: chain_monitor. get_update_future( ) ,
893
903
c: om_fut,
894
904
d: lm_fut,
895
905
e: sleeper( if mobile_interruptable_platform {
@@ -1108,18 +1118,18 @@ impl BackgroundProcessor {
1108
1118
/// # Example
1109
1119
/// ```
1110
1120
/// # use lightning_background_processor::*;
1111
- /// let config = BackgroundProcessorConfigBuilder::new(
1121
+ /// let mut builder = BackgroundProcessorConfigBuilder::new(
1112
1122
/// persister,
1113
1123
/// event_handler,
1114
1124
/// chain_monitor,
1115
1125
/// channel_manager,
1116
1126
/// gossip_sync,
1117
1127
/// peer_manager,
1118
1128
/// logger
1119
- /// )
1120
- /// .with_onion_messenger(messenger)
1121
- /// .with_scorer(scorer)
1122
- /// .build();
1129
+ /// );
1130
+ /// builder .with_onion_messenger(messenger);
1131
+ /// .with_scorer(scorer);
1132
+ /// let config = builder .build();
1123
1133
/// let bg_processor = BackgroundProcessor::from_config(config);
1124
1134
/// ```
1125
1135
pub fn from_config <
@@ -1258,18 +1268,18 @@ impl BackgroundProcessor {
1258
1268
/// # Example
1259
1269
/// ```
1260
1270
/// # use lightning_background_processor::*;
1261
- /// let config = BackgroundProcessorConfigBuilder::new(
1271
+ /// let mut builder = BackgroundProcessorConfigBuilder::new(
1262
1272
/// persister,
1263
1273
/// event_handler,
1264
1274
/// chain_monitor,
1265
1275
/// channel_manager,
1266
1276
/// gossip_sync,
1267
1277
/// peer_manager,
1268
1278
/// logger
1269
- /// )
1270
- /// .with_onion_messenger(messenger) // Optional
1271
- /// .with_scorer(scorer) // Optional
1272
- /// .build();
1279
+ /// );
1280
+ /// builder .with_onion_messenger(messenger); // Optional
1281
+ /// .with_scorer(scorer); // Optional
1282
+ /// let config = builder .build();
1273
1283
///
1274
1284
/// // Use with BackgroundProcessor
1275
1285
/// let processor = BackgroundProcessor::from_config(config);
@@ -1281,7 +1291,7 @@ impl BackgroundProcessor {
1281
1291
/// process_events_async(config, sleeper, mobile_interruptable_platform, fetch_time).await?;"
1282
1292
) ]
1283
1293
/// ```
1284
- #[ cfg( feature = "std" ) ]
1294
+ #[ cfg( any ( feature = "std" , feature = "futures" ) ) ]
1285
1295
pub struct BackgroundProcessorConfig <
1286
1296
' a ,
1287
1297
UL : ' static + Deref + Send + Sync ,
@@ -1291,7 +1301,8 @@ pub struct BackgroundProcessorConfig<
1291
1301
G : ' static + Deref < Target = NetworkGraph < L > > + Send + Sync ,
1292
1302
L : ' static + Deref + Send + Sync ,
1293
1303
P : ' static + Deref + Send + Sync ,
1294
- EH : ' static + EventHandler + Send ,
1304
+ #[ cfg( feature = "std" ) ] EH : ' static + EventHandler + Send ,
1305
+ #[ cfg( feature = "futures" ) ] EH : ' static + Fn ( Event ) -> core:: future:: Future < Output = Result < ( ) , ReplayEvent > > ,
1295
1306
PS : ' static + Deref + Send ,
1296
1307
M : ' static
1297
1308
+ Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P > >
@@ -1418,7 +1429,7 @@ where
1418
1429
PM :: Target : APeerManager + Send + Sync ,
1419
1430
{
1420
1431
/// Creates a new builder instance.
1421
- pub ( crate ) fn new (
1432
+ pub fn new (
1422
1433
persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
1423
1434
gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM , logger : L ,
1424
1435
) -> Self {
@@ -1477,7 +1488,9 @@ impl Drop for BackgroundProcessor {
1477
1488
1478
1489
#[ cfg( all( feature = "std" , test) ) ]
1479
1490
mod tests {
1480
- use super :: { BackgroundProcessor , GossipSync , FRESHNESS_TIMER } ;
1491
+ use super :: {
1492
+ BackgroundProcessor , BackgroundProcessorConfigBuilder , GossipSync , FRESHNESS_TIMER ,
1493
+ } ;
1481
1494
use bitcoin:: constants:: { genesis_block, ChainHash } ;
1482
1495
use bitcoin:: hashes:: Hash ;
1483
1496
use bitcoin:: locktime:: absolute:: LockTime ;
@@ -2429,7 +2442,7 @@ mod tests {
2429
2442
Persister :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ,
2430
2443
) ;
2431
2444
2432
- let config = BackgroundProcessorConfigBuilder :: new (
2445
+ let mut builder = BackgroundProcessorConfigBuilder :: new (
2433
2446
persister,
2434
2447
|_: _ | async { Ok ( ( ) ) } ,
2435
2448
nodes[ 0 ] . chain_monitor . clone ( ) ,
@@ -2439,10 +2452,11 @@ mod tests {
2439
2452
Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2440
2453
Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
2441
2454
nodes[ 0 ] . logger . clone ( ) ,
2442
- )
2443
- . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2444
- . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
2445
- . build ( ) ;
2455
+ ) ;
2456
+ builder
2457
+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2458
+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) ) ;
2459
+ let config = builder. build ( ) ;
2446
2460
2447
2461
let bp_future = super :: process_events_async (
2448
2462
config,
@@ -2940,7 +2954,7 @@ mod tests {
2940
2954
let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2941
2955
let persister = Arc :: new ( Persister :: new ( data_dir) . with_graph_persistence_notifier ( sender) ) ;
2942
2956
2943
- let config = BackgroundProcessorConfigBuilder :: new (
2957
+ let mut builder = BackgroundProcessorConfigBuilder :: new (
2944
2958
persister,
2945
2959
|_: _ | async { Ok ( ( ) ) } ,
2946
2960
nodes[ 0 ] . chain_monitor . clone ( ) ,
@@ -2950,10 +2964,11 @@ mod tests {
2950
2964
Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2951
2965
Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
2952
2966
nodes[ 0 ] . logger . clone ( ) ,
2953
- )
2954
- . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2955
- . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
2956
- . build ( ) ;
2967
+ ) ;
2968
+ builder
2969
+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2970
+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) ) ;
2971
+ let config = builder. build ( ) ;
2957
2972
2958
2973
let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
2959
2974
let bp_future = super :: process_events_async (
@@ -3161,7 +3176,7 @@ mod tests {
3161
3176
3162
3177
let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
3163
3178
3164
- let config = BackgroundProcessorConfigBuilder :: new (
3179
+ let mut builder = BackgroundProcessorConfigBuilder :: new (
3165
3180
persister,
3166
3181
event_handler,
3167
3182
nodes[ 0 ] . chain_monitor . clone ( ) ,
@@ -3171,10 +3186,11 @@ mod tests {
3171
3186
Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
3172
3187
Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
3173
3188
nodes[ 0 ] . logger . clone ( ) ,
3174
- )
3175
- . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3176
- . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
3177
- . build ( ) ;
3189
+ ) ;
3190
+ builder
3191
+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3192
+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) ) ;
3193
+ let config = builder. build ( ) ;
3178
3194
3179
3195
let bp_future = super :: process_events_async (
3180
3196
config,
@@ -3209,11 +3225,11 @@ mod tests {
3209
3225
}
3210
3226
3211
3227
#[ test]
3212
- fn test_background_processor_builder ( ) {
3228
+ fn test_background_processor_config_builder ( ) {
3213
3229
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
3214
3230
// updates. Also test that when new updates are available, the manager signals that it needs
3215
3231
// re-persistence and is successfully re-persisted.
3216
- let ( persist_dir, nodes) = create_nodes ( 2 , "test_background_processor_builder " ) ;
3232
+ let ( persist_dir, nodes) = create_nodes ( 2 , "test_background_processor_config_builder " ) ;
3217
3233
3218
3234
// Go through the channel creation process so that each node has something to persist. Since
3219
3235
// open_channel consumes events, it must complete before starting BackgroundProcessor to
@@ -3224,18 +3240,19 @@ mod tests {
3224
3240
let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
3225
3241
let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
3226
3242
let event_handler = |_: _ | Ok ( ( ) ) ;
3227
- let config = BackgroundProcessorConfigBuilder :: new (
3243
+ let mut builder = BackgroundProcessorConfigBuilder :: new (
3228
3244
persister,
3229
3245
event_handler,
3230
3246
nodes[ 0 ] . chain_monitor . clone ( ) ,
3231
3247
nodes[ 0 ] . node . clone ( ) ,
3232
3248
nodes[ 0 ] . p2p_gossip_sync ( ) ,
3233
3249
nodes[ 0 ] . peer_manager . clone ( ) ,
3234
3250
nodes[ 0 ] . logger . clone ( ) ,
3235
- )
3236
- . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3237
- . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
3238
- . build ( ) ;
3251
+ ) ;
3252
+ builder
3253
+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3254
+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) ) ;
3255
+ let config = builder. build ( ) ;
3239
3256
3240
3257
let bg_processor = BackgroundProcessor :: from_config ( config) ;
3241
3258
@@ -3287,7 +3304,7 @@ mod tests {
3287
3304
. unwrap ( ) ;
3288
3305
3289
3306
// Check that the force-close updates are persisted
3290
- check_persisted_data ! ( nodes[ 0 ] . node, manager_path . clone( ) ) ;
3307
+ check_persisted_data ! ( nodes[ 0 ] . node, filepath . clone( ) ) ;
3291
3308
loop {
3292
3309
if !nodes[ 0 ] . node . get_event_or_persist_condvar_value ( ) {
3293
3310
break ;
@@ -3297,7 +3314,7 @@ mod tests {
3297
3314
// Check network graph is persisted
3298
3315
let filepath =
3299
3316
get_full_filepath ( format ! ( "{}_persister_0" , & persist_dir) , "network_graph" . to_string ( ) ) ;
3300
- check_persisted_data ! ( nodes[ 0 ] . network_graph, filepath) ;
3317
+ check_persisted_data ! ( nodes[ 0 ] . network_graph, filepath. clone ( ) ) ;
3301
3318
3302
3319
// Check scorer is persisted
3303
3320
let filepath =
0 commit comments