Skip to content

Commit 60df04f

Browse files
committed
feat(background-processor): add BackgroundProcessorBuilder for optional components
1 parent 101aa6f commit 60df04f

File tree

1 file changed

+289
-0
lines changed
  • lightning-background-processor/src

1 file changed

+289
-0
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ use std::time::Instant;
6464
#[cfg(not(feature = "std"))]
6565
use alloc::boxed::Box;
6666

67+
#[cfg(feature = "std")]
68+
use std::marker::PhantomData;
69+
6770
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
6871
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
6972
/// responsibilities are:
@@ -1135,6 +1138,193 @@ impl BackgroundProcessor {
11351138
None => Ok(()),
11361139
}
11371140
}
1141+
1142+
/// Creates a new [`BackgroundProcessorBuilder`] to construct a [`BackgroundProcessor`] with optional components.
1143+
pub fn builder<
1144+
'a,
1145+
UL: 'static + Deref + Send + Sync,
1146+
CF: 'static + Deref + Send + Sync,
1147+
T: 'static + Deref + Send + Sync,
1148+
F: 'static + Deref + Send + Sync,
1149+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
1150+
L: 'static + Deref + Send + Sync,
1151+
P: 'static + Deref + Send + Sync,
1152+
EH: 'static + EventHandler + Send,
1153+
PS: 'static + Deref + Send,
1154+
M: 'static
1155+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
1156+
+ Send
1157+
+ Sync,
1158+
CM: 'static + Deref + Send + Sync,
1159+
OM: 'static + Deref + Send + Sync,
1160+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
1161+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1162+
PM: 'static + Deref + Send + Sync,
1163+
S: 'static + Deref<Target = SC> + Send + Sync,
1164+
SC: for<'b> WriteableScore<'b>,
1165+
>(
1166+
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
1167+
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L,
1168+
) -> BackgroundProcessorBuilder<'a, UL, CF, T, F, G, L, P, EH, PS, M, CM, OM, PGS, RGS, PM, S, SC>
1169+
where
1170+
UL::Target: 'static + UtxoLookup,
1171+
CF::Target: 'static + chain::Filter,
1172+
T::Target: 'static + BroadcasterInterface,
1173+
F::Target: 'static + FeeEstimator,
1174+
L::Target: 'static + Logger,
1175+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1176+
PS::Target: 'static + Persister<'a, CM, L, S>,
1177+
CM::Target: AChannelManager + Send + Sync,
1178+
OM::Target: AOnionMessenger + Send + Sync,
1179+
PM::Target: APeerManager + Send + Sync,
1180+
{
1181+
BackgroundProcessorBuilder::new(
1182+
persister,
1183+
event_handler,
1184+
chain_monitor,
1185+
channel_manager,
1186+
gossip_sync,
1187+
peer_manager,
1188+
logger,
1189+
)
1190+
}
1191+
}
1192+
1193+
/// A builder for constructing a [`BackgroundProcessor`] with optional components.
1194+
///
1195+
/// This builder provides a flexible and type-safe way to construct a [`BackgroundProcessor`]
1196+
/// with optional components like `onion_messenger` and `scorer`. It helps avoid specifying
1197+
/// concrete types for components that aren't being used.
1198+
///
1199+
/// Use [`BackgroundProcessor::builder`] to create a new builder instance.
1200+
#[cfg(feature = "std")]
1201+
pub struct BackgroundProcessorBuilder<
1202+
'a,
1203+
UL: 'static + Deref + Send + Sync,
1204+
CF: 'static + Deref + Send + Sync,
1205+
T: 'static + Deref + Send + Sync,
1206+
F: 'static + Deref + Send + Sync,
1207+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
1208+
L: 'static + Deref + Send + Sync,
1209+
P: 'static + Deref + Send + Sync,
1210+
EH: 'static + EventHandler + Send,
1211+
PS: 'static + Deref + Send,
1212+
M: 'static
1213+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
1214+
+ Send
1215+
+ Sync,
1216+
CM: 'static + Deref + Send + Sync,
1217+
OM: 'static + Deref + Send + Sync,
1218+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
1219+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1220+
PM: 'static + Deref + Send + Sync,
1221+
S: 'static + Deref<Target = SC> + Send + Sync,
1222+
SC: for<'b> WriteableScore<'b>,
1223+
> where
1224+
UL::Target: 'static + UtxoLookup,
1225+
CF::Target: 'static + chain::Filter,
1226+
T::Target: 'static + BroadcasterInterface,
1227+
F::Target: 'static + FeeEstimator,
1228+
L::Target: 'static + Logger,
1229+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1230+
PS::Target: 'static + Persister<'a, CM, L, S>,
1231+
CM::Target: AChannelManager + Send + Sync,
1232+
OM::Target: AOnionMessenger + Send + Sync,
1233+
PM::Target: APeerManager + Send + Sync,
1234+
{
1235+
persister: PS,
1236+
event_handler: EH,
1237+
chain_monitor: M,
1238+
channel_manager: CM,
1239+
onion_messenger: Option<OM>,
1240+
gossip_sync: GossipSync<PGS, RGS, G, UL, L>,
1241+
peer_manager: PM,
1242+
logger: L,
1243+
scorer: Option<S>,
1244+
_phantom: PhantomData<(&'a (), CF, T, F, P)>,
1245+
}
1246+
1247+
#[cfg(feature = "std")]
1248+
impl<
1249+
'a,
1250+
UL: 'static + Deref + Send + Sync,
1251+
CF: 'static + Deref + Send + Sync,
1252+
T: 'static + Deref + Send + Sync,
1253+
F: 'static + Deref + Send + Sync,
1254+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
1255+
L: 'static + Deref + Send + Sync,
1256+
P: 'static + Deref + Send + Sync,
1257+
EH: 'static + EventHandler + Send,
1258+
PS: 'static + Deref + Send,
1259+
M: 'static
1260+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
1261+
+ Send
1262+
+ Sync,
1263+
CM: 'static + Deref + Send + Sync,
1264+
OM: 'static + Deref + Send + Sync,
1265+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
1266+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1267+
PM: 'static + Deref + Send + Sync,
1268+
S: 'static + Deref<Target = SC> + Send + Sync,
1269+
SC: for<'b> WriteableScore<'b>,
1270+
> BackgroundProcessorBuilder<'a, UL, CF, T, F, G, L, P, EH, PS, M, CM, OM, PGS, RGS, PM, S, SC>
1271+
where
1272+
UL::Target: 'static + UtxoLookup,
1273+
CF::Target: 'static + chain::Filter,
1274+
T::Target: 'static + BroadcasterInterface,
1275+
F::Target: 'static + FeeEstimator,
1276+
L::Target: 'static + Logger,
1277+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1278+
PS::Target: 'static + Persister<'a, CM, L, S>,
1279+
CM::Target: AChannelManager + Send + Sync,
1280+
OM::Target: AOnionMessenger + Send + Sync,
1281+
PM::Target: APeerManager + Send + Sync,
1282+
{
1283+
/// Creates a new builder instance. This is an internal method - use [`BackgroundProcessor::builder`] instead.
1284+
pub fn new(
1285+
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
1286+
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L,
1287+
) -> Self {
1288+
Self {
1289+
persister,
1290+
event_handler,
1291+
chain_monitor,
1292+
channel_manager,
1293+
onion_messenger: None,
1294+
gossip_sync,
1295+
peer_manager,
1296+
logger,
1297+
scorer: None,
1298+
_phantom: PhantomData,
1299+
}
1300+
}
1301+
1302+
/// Sets the optional onion messenger component.
1303+
pub fn with_onion_messenger(&mut self, onion_messenger: OM) -> &mut Self {
1304+
self.onion_messenger = Some(onion_messenger);
1305+
self
1306+
}
1307+
1308+
/// Sets the optional scorer component.
1309+
pub fn with_scorer(&mut self, scorer: S) -> &mut Self {
1310+
self.scorer = Some(scorer);
1311+
self
1312+
}
1313+
1314+
/// Builds and starts the [`BackgroundProcessor`].
1315+
pub fn build(self) -> BackgroundProcessor {
1316+
BackgroundProcessor::start(
1317+
self.persister,
1318+
self.event_handler,
1319+
self.chain_monitor,
1320+
self.channel_manager,
1321+
self.onion_messenger,
1322+
self.gossip_sync,
1323+
self.peer_manager,
1324+
self.logger,
1325+
self.scorer,
1326+
)
1327+
}
11381328
}
11391329

11401330
#[cfg(feature = "std")]
@@ -2861,4 +3051,103 @@ mod tests {
28613051
r1.unwrap().unwrap();
28623052
r2.unwrap()
28633053
}
3054+
3055+
#[test]
3056+
fn test_background_processor_builder() {
3057+
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
3058+
// updates. Also test that when new updates are available, the manager signals that it needs
3059+
// re-persistence and is successfully re-persisted.
3060+
let (persist_dir, nodes) = create_nodes(2, "test_background_processor_builder");
3061+
3062+
// Go through the channel creation process so that each node has something to persist. Since
3063+
// open_channel consumes events, it must complete before starting BackgroundProcessor to
3064+
// avoid a race with processing events.
3065+
let tx = open_channel!(nodes[0], nodes[1], 100000);
3066+
3067+
// Initiate the background processors to watch each node.
3068+
let data_dir = nodes[0].kv_store.get_data_dir();
3069+
let persister = Arc::new(Persister::new(data_dir));
3070+
let event_handler = |_: _| Ok(());
3071+
let bg_processor = BackgroundProcessor::builder(
3072+
persister,
3073+
event_handler,
3074+
nodes[0].chain_monitor.clone(),
3075+
nodes[0].node.clone(),
3076+
nodes[0].p2p_gossip_sync(),
3077+
nodes[0].peer_manager.clone(),
3078+
nodes[0].logger.clone(),
3079+
)
3080+
.with_onion_messenger(nodes[0].messenger.clone())
3081+
.with_scorer(nodes[0].scorer.clone())
3082+
.build();
3083+
3084+
macro_rules! check_persisted_data {
3085+
($node: expr, $filepath: expr) => {
3086+
let mut expected_bytes = Vec::new();
3087+
loop {
3088+
expected_bytes.clear();
3089+
match $node.write(&mut expected_bytes) {
3090+
Ok(()) => match std::fs::read($filepath) {
3091+
Ok(bytes) => {
3092+
if bytes == expected_bytes {
3093+
break;
3094+
} else {
3095+
continue;
3096+
}
3097+
},
3098+
Err(_) => continue,
3099+
},
3100+
Err(e) => panic!("Unexpected error: {}", e),
3101+
}
3102+
}
3103+
};
3104+
}
3105+
3106+
// Check that the initial data is persisted as expected
3107+
let filepath =
3108+
get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
3109+
check_persisted_data!(nodes[0].node, filepath.clone());
3110+
3111+
loop {
3112+
if !nodes[0].node.get_event_or_persist_condvar_value() {
3113+
break;
3114+
}
3115+
}
3116+
3117+
// Force-close the channel.
3118+
let error_message = "Channel force-closed";
3119+
nodes[0]
3120+
.node
3121+
.force_close_broadcasting_latest_txn(
3122+
&ChannelId::v1_from_funding_outpoint(OutPoint {
3123+
txid: tx.compute_txid(),
3124+
index: 0,
3125+
}),
3126+
&nodes[1].node.get_our_node_id(),
3127+
error_message.to_string(),
3128+
)
3129+
.unwrap();
3130+
3131+
// Check that the force-close updates are persisted
3132+
check_persisted_data!(nodes[0].node, manager_path.clone());
3133+
loop {
3134+
if !nodes[0].node.get_event_or_persist_condvar_value() {
3135+
break;
3136+
}
3137+
}
3138+
3139+
// Check network graph is persisted
3140+
let filepath =
3141+
get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
3142+
check_persisted_data!(nodes[0].network_graph, filepath);
3143+
3144+
// Check scorer is persisted
3145+
let filepath =
3146+
get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
3147+
check_persisted_data!(nodes[0].scorer, filepath);
3148+
3149+
if !std::thread::panicking() {
3150+
bg_processor.stop().unwrap();
3151+
}
3152+
}
28643153
}

0 commit comments

Comments
 (0)