Skip to content

Commit ae2de9b

Browse files
committed
introduce BackgroundProcessorConfig
This allows for consistent configuration of sync and async variant of the BackgroundProcessor
1 parent 06721f1 commit ae2de9b

File tree

1 file changed

+247
-39
lines changed
  • lightning-background-processor/src

1 file changed

+247
-39
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 247 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -826,11 +826,11 @@ where
826826
{
827827
let mut should_break = false;
828828
let async_event_handler = |event| {
829-
let network_graph = gossip_sync.network_graph();
830-
let event_handler = &event_handler;
831-
let scorer = &scorer;
832-
let logger = &logger;
833-
let persister = &persister;
829+
let network_graph = config.gossip_sync.network_graph();
830+
let event_handler = &config.event_handler;
831+
let scorer = &config.scorer;
832+
let logger = &config.logger;
833+
let persister = &config.persister;
834834
let fetch_time = &fetch_time;
835835
// We should be able to drop the Box once our MSRV is 1.68
836836
Box::pin(async move {
@@ -855,13 +855,13 @@ where
855855
})
856856
};
857857
define_run_body!(
858-
persister,
859-
chain_monitor,
860-
chain_monitor.process_pending_events_async(async_event_handler).await,
861-
channel_manager,
862-
channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
863-
onion_messenger,
864-
if let Some(om) = &onion_messenger {
858+
config.persister,
859+
config.chain_monitor,
860+
config.chain_monitor.process_pending_events_async(async_event_handler).await,
861+
config.channel_manager,
862+
config.channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
863+
config.onion_messenger,
864+
if let Some(om) = &config.onion_messenger {
865865
om.get_om().process_pending_events_async(async_event_handler).await
866866
},
867867
peer_manager,
@@ -875,7 +875,7 @@ where
875875
scorer,
876876
should_break,
877877
{
878-
let om_fut = if let Some(om) = onion_messenger.as_ref() {
878+
let om_fut = if let Some(om) = config.onion_messenger.as_ref() {
879879
let fut = om.get_om().get_update_future();
880880
OptionalSelector { optional_future: Some(fut) }
881881
} else {
@@ -888,8 +888,8 @@ where
888888
OptionalSelector { optional_future: None }
889889
};
890890
let fut = Selector {
891-
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
892-
b: chain_monitor.get_update_future(),
891+
a: config.channel_manager.get_cm().get_event_or_persistence_needed_future(),
892+
b: config.chain_monitor.get_update_future(),
893893
c: om_fut,
894894
d: lm_fut,
895895
e: sleeper(if mobile_interruptable_platform {
@@ -1099,6 +1099,98 @@ impl BackgroundProcessor {
10991099
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
11001100
}
11011101

1102+
/// Creates a new [`BackgroundProcessor`] from a [`BackgroundProcessorConfig`].
1103+
/// This provides a more structured approach to configuration. The processor will start processing events immediately upon creation.
1104+
///
1105+
/// This method is functionally equivalent to [`BackgroundProcessor::start`], but takes a configuration
1106+
/// object instead of individual parameters.
1107+
///
1108+
/// # Example
1109+
/// ```
1110+
/// # use lightning_background_processor::*;
1111+
/// let config = BackgroundProcessorBuilder::new(
1112+
/// persister,
1113+
/// event_handler,
1114+
/// chain_monitor,
1115+
/// channel_manager,
1116+
/// gossip_sync,
1117+
/// peer_manager,
1118+
/// logger
1119+
/// )
1120+
/// .with_onion_messenger(messenger)
1121+
/// .with_scorer(scorer)
1122+
/// .build();
1123+
/// let bg_processor = BackgroundProcessor::from_config(config);
1124+
/// ```
1125+
pub fn from_config<
1126+
'a,
1127+
UL: 'static + Deref + Send + Sync,
1128+
CF: 'static + Deref + Send + Sync,
1129+
T: 'static + Deref + Send + Sync,
1130+
F: 'static + Deref + Send + Sync,
1131+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
1132+
L: 'static + Deref + Send + Sync,
1133+
P: 'static + Deref + Send + Sync,
1134+
EH: 'static + EventHandler + Send,
1135+
PS: 'static + Deref + Send,
1136+
M: 'static
1137+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
1138+
+ Send
1139+
+ Sync,
1140+
CM: 'static + Deref + Send + Sync,
1141+
OM: 'static + Deref + Send + Sync,
1142+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
1143+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1144+
PM: 'static + Deref + Send + Sync,
1145+
S: 'static + Deref<Target = SC> + Send + Sync,
1146+
SC: for<'b> WriteableScore<'b>,
1147+
>(
1148+
config: BackgroundProcessorConfig<
1149+
'a,
1150+
UL,
1151+
CF,
1152+
T,
1153+
F,
1154+
G,
1155+
L,
1156+
P,
1157+
EH,
1158+
PS,
1159+
M,
1160+
CM,
1161+
OM,
1162+
PGS,
1163+
RGS,
1164+
PM,
1165+
S,
1166+
SC,
1167+
>,
1168+
) -> Self
1169+
where
1170+
UL::Target: 'static + UtxoLookup,
1171+
CF::Target: 'static + chain::Filter,
1172+
T::Target: 'static + BroadcasterInterface,
1173+
F::Target: 'static + FeeEstimator,
1174+
L::Target: 'static + Logger,
1175+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1176+
PS::Target: 'static + Persister<'a, CM, L, S>,
1177+
CM::Target: AChannelManager + Send + Sync,
1178+
OM::Target: AOnionMessenger + Send + Sync,
1179+
PM::Target: APeerManager + Send + Sync,
1180+
{
1181+
Self::start(
1182+
config.persister,
1183+
config.event_handler,
1184+
config.chain_monitor,
1185+
config.channel_manager,
1186+
config.onion_messenger,
1187+
config.gossip_sync,
1188+
config.peer_manager,
1189+
config.logger,
1190+
config.scorer,
1191+
)
1192+
}
1193+
11021194
/// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
11031195
/// [`ChannelManager`].
11041196
///
@@ -1138,7 +1230,102 @@ impl BackgroundProcessor {
11381230
None => Ok(()),
11391231
}
11401232
}
1233+
}
11411234

1235+
/// Configuration for synchronous [`BackgroundProcessor`]
1236+
#[cfg_attr(feature = "futures", doc = "and asynchronous [`process_events_async`]")]
1237+
/// event processing.
1238+
///
1239+
/// This configuration holds all components needed for background processing,
1240+
/// including required components (like the channel manager and peer manager) and optional
1241+
/// components (like the onion messenger and scorer).
1242+
///
1243+
/// The configuration can be constructed using [`BackgroundProcessorBuilder`], which provides
1244+
/// a convenient builder pattern for setting up both required and optional components.
1245+
///
1246+
/// This same configuration can be used for
1247+
#[cfg_attr(
1248+
not(feature = "futures"),
1249+
doc = "creating a [`BackgroundProcessor`] via [`BackgroundProcessor::from_config`]."
1250+
)]
1251+
#[cfg_attr(
1252+
feature = "futures",
1253+
doc = "both:
1254+
/// * Creating a [`BackgroundProcessor`] via [`BackgroundProcessor::from_config`]
1255+
/// * Running the async variant of the background processor via [`process_events_async`]"
1256+
)]
1257+
///
1258+
/// # Example
1259+
/// ```
1260+
/// # use lightning_background_processor::*;
1261+
/// let config = BackgroundProcessorBuilder::new(
1262+
/// persister,
1263+
/// event_handler,
1264+
/// chain_monitor,
1265+
/// channel_manager,
1266+
/// gossip_sync,
1267+
/// peer_manager,
1268+
/// logger
1269+
/// )
1270+
/// .with_onion_messenger(messenger) // Optional
1271+
/// .with_scorer(scorer) // Optional
1272+
/// .build();
1273+
///
1274+
/// // Use with BackgroundProcessor
1275+
/// let processor = BackgroundProcessor::from_config(config);
1276+
///
1277+
#[cfg_attr(
1278+
feature = "futures",
1279+
doc = "
1280+
/// // Or use with async processing
1281+
/// process_events_async(config, sleeper, mobile_interruptable_platform, fetch_time).await?;"
1282+
)]
1283+
/// ```
1284+
#[cfg(feature = "std")]
1285+
pub struct BackgroundProcessorConfig<
1286+
'a,
1287+
UL: 'static + Deref + Send + Sync,
1288+
CF: 'static + Deref + Send + Sync,
1289+
T: 'static + Deref + Send + Sync,
1290+
F: 'static + Deref + Send + Sync,
1291+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
1292+
L: 'static + Deref + Send + Sync,
1293+
P: 'static + Deref + Send + Sync,
1294+
EH: 'static + EventHandler + Send,
1295+
PS: 'static + Deref + Send,
1296+
M: 'static
1297+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
1298+
+ Send
1299+
+ Sync,
1300+
CM: 'static + Deref + Send + Sync,
1301+
OM: 'static + Deref + Send + Sync,
1302+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
1303+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1304+
PM: 'static + Deref + Send + Sync,
1305+
S: 'static + Deref<Target = SC> + Send + Sync,
1306+
SC: for<'b> WriteableScore<'b>,
1307+
> where
1308+
UL::Target: 'static + UtxoLookup,
1309+
CF::Target: 'static + chain::Filter,
1310+
T::Target: 'static + BroadcasterInterface,
1311+
F::Target: 'static + FeeEstimator,
1312+
L::Target: 'static + Logger,
1313+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1314+
PS::Target: 'static + Persister<'a, CM, L, S>,
1315+
CM::Target: AChannelManager + Send + Sync,
1316+
OM::Target: AOnionMessenger + Send + Sync,
1317+
PM::Target: APeerManager + Send + Sync,
1318+
{
1319+
persister: PS,
1320+
event_handler: EH,
1321+
chain_monitor: M,
1322+
channel_manager: CM,
1323+
onion_messenger: Option<OM>,
1324+
gossip_sync: GossipSync<PGS, RGS, G, UL, L>,
1325+
peer_manager: PM,
1326+
logger: L,
1327+
scorer: Option<S>,
1328+
_phantom: PhantomData<(&'a (), CF, T, F, P)>,
11421329
}
11431330

11441331
/// A builder for constructing a [`BackgroundProcessor`] with optional components.
@@ -1260,19 +1447,23 @@ where
12601447
self
12611448
}
12621449

1263-
/// Builds and starts the [`BackgroundProcessor`].
1264-
pub fn build(self) -> BackgroundProcessor {
1265-
BackgroundProcessor::start(
1266-
self.persister,
1267-
self.event_handler,
1268-
self.chain_monitor,
1269-
self.channel_manager,
1270-
self.onion_messenger,
1271-
self.gossip_sync,
1272-
self.peer_manager,
1273-
self.logger,
1274-
self.scorer,
1275-
)
1450+
/// Builds and returns a [`BackgroundProcessorConfig`] object.
1451+
pub fn build(
1452+
self,
1453+
) -> BackgroundProcessorConfig<'a, UL, CF, T, F, G, L, P, EH, PS, M, CM, OM, PGS, RGS, PM, S, SC>
1454+
{
1455+
BackgroundProcessorConfig {
1456+
persister: self.persister,
1457+
event_handler: self.event_handler,
1458+
chain_monitor: self.chain_monitor,
1459+
channel_manager: self.channel_manager,
1460+
onion_messenger: self.onion_messenger,
1461+
gossip_sync: self.gossip_sync,
1462+
peer_manager: self.peer_manager,
1463+
logger: self.logger,
1464+
scorer: self.scorer,
1465+
_phantom: PhantomData,
1466+
}
12761467
}
12771468
}
12781469

@@ -2237,18 +2428,23 @@ mod tests {
22372428
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
22382429
);
22392430

2240-
let bp_future = super::process_events_async(
2431+
let config = BackgroundProcessorBuilder::new(
22412432
persister,
22422433
|_: _| async { Ok(()) },
22432434
nodes[0].chain_monitor.clone(),
22442435
nodes[0].node.clone(),
2245-
Some(nodes[0].messenger.clone()),
22462436
nodes[0].rapid_gossip_sync(),
22472437
nodes[0].peer_manager.clone(),
22482438
Some(Arc::clone(&nodes[0].liquidity_manager)),
22492439
Some(nodes[0].sweeper.sweeper_async()),
22502440
nodes[0].logger.clone(),
2251-
Some(nodes[0].scorer.clone()),
2441+
)
2442+
.with_onion_messenger(nodes[0].messenger.clone())
2443+
.with_scorer(nodes[0].scorer.clone())
2444+
.build();
2445+
2446+
let bp_future = super::process_events_async(
2447+
config,
22522448
move |dur: Duration| {
22532449
Box::pin(async move {
22542450
tokio::time::sleep(dur).await;
@@ -2743,19 +2939,24 @@ mod tests {
27432939
let data_dir = nodes[0].kv_store.get_data_dir();
27442940
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
27452941

2746-
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2747-
let bp_future = super::process_events_async(
2942+
let config = BackgroundProcessorBuilder::new(
27482943
persister,
27492944
|_: _| async { Ok(()) },
27502945
nodes[0].chain_monitor.clone(),
27512946
nodes[0].node.clone(),
2752-
Some(nodes[0].messenger.clone()),
27532947
nodes[0].rapid_gossip_sync(),
27542948
nodes[0].peer_manager.clone(),
27552949
Some(Arc::clone(&nodes[0].liquidity_manager)),
27562950
Some(nodes[0].sweeper.sweeper_async()),
27572951
nodes[0].logger.clone(),
2758-
Some(nodes[0].scorer.clone()),
2952+
)
2953+
.with_onion_messenger(nodes[0].messenger.clone())
2954+
.with_scorer(nodes[0].scorer.clone())
2955+
.build();
2956+
2957+
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2958+
let bp_future = super::process_events_async(
2959+
config,
27592960
move |dur: Duration| {
27602961
let mut exit_receiver = exit_receiver.clone();
27612962
Box::pin(async move {
@@ -2959,18 +3160,23 @@ mod tests {
29593160

29603161
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
29613162

2962-
let bp_future = super::process_events_async(
3163+
let config = BackgroundProcessorBuilder::new(
29633164
persister,
29643165
event_handler,
29653166
nodes[0].chain_monitor.clone(),
29663167
nodes[0].node.clone(),
2967-
Some(nodes[0].messenger.clone()),
29683168
nodes[0].no_gossip_sync(),
29693169
nodes[0].peer_manager.clone(),
29703170
Some(Arc::clone(&nodes[0].liquidity_manager)),
29713171
Some(nodes[0].sweeper.sweeper_async()),
29723172
nodes[0].logger.clone(),
2973-
Some(nodes[0].scorer.clone()),
3173+
)
3174+
.with_onion_messenger(nodes[0].messenger.clone())
3175+
.with_scorer(nodes[0].scorer.clone())
3176+
.build();
3177+
3178+
let bp_future = super::process_events_async(
3179+
config,
29743180
move |dur: Duration| {
29753181
let mut exit_receiver = exit_receiver.clone();
29763182
Box::pin(async move {
@@ -3017,7 +3223,7 @@ mod tests {
30173223
let data_dir = nodes[0].kv_store.get_data_dir();
30183224
let persister = Arc::new(Persister::new(data_dir));
30193225
let event_handler = |_: _| Ok(());
3020-
let bg_processor = BackgroundProcessorBuilder::new(
3226+
let config = BackgroundProcessorBuilder::new(
30213227
persister,
30223228
event_handler,
30233229
nodes[0].chain_monitor.clone(),
@@ -3030,6 +3236,8 @@ mod tests {
30303236
.with_scorer(nodes[0].scorer.clone())
30313237
.build();
30323238

3239+
let bg_processor = BackgroundProcessor::from_config(config);
3240+
30333241
macro_rules! check_persisted_data {
30343242
($node: expr, $filepath: expr) => {
30353243
let mut expected_bytes = Vec::new();

0 commit comments

Comments
 (0)