Skip to content

Prep background-processor for bindings inclusion #884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ members = [
"lightning-invoice",
"lightning-net-tokio",
"lightning-persister",
"background-processor",
"lightning-background-processor",
]

# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::ops::Deref;

/// BackgroundProcessor takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
Expand Down Expand Up @@ -47,6 +48,38 @@ const FRESHNESS_TIMER: u64 = 60;
#[cfg(test)]
const FRESHNESS_TIMER: u64 = 1;

/// Trait which handles persisting a [`ChannelManager`] to disk.
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
pub trait ChannelManagerPersister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
{
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
/// (which will cause the [`BackgroundProcessor`] which called this method to exit.
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
}

impl<Fun, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
ChannelManagerPersister<Signer, M, T, K, F, L> for Fun where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
Fun: Fn(&ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>,
{
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
self(channel_manager)
}
}

impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the top-level
/// documentation.
Expand All @@ -66,25 +99,29 @@ impl BackgroundProcessor {
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
pub fn start<PM, Signer, M, T, K, F, L, Descriptor: 'static + SocketDescriptor + Send, CM, RM>(
persist_channel_manager: PM,
channel_manager: Arc<ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>,
peer_manager: Arc<PeerManager<Descriptor, Arc<CM>, Arc<RM>, Arc<L>>>, logger: Arc<L>,
) -> Self
where
pub fn start<
Signer: 'static + Sign,
M: 'static + chain::Watch<Signer>,
T: 'static + BroadcasterInterface,
K: 'static + KeysInterface<Signer = Signer>,
F: 'static + FeeEstimator,
L: 'static + Logger,
CM: 'static + ChannelMessageHandler,
RM: 'static + RoutingMessageHandler,
PM: 'static
+ Send
+ Fn(
&ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>,
) -> Result<(), std::io::Error>,
M: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
K: 'static + Deref + Send + Sync,
F: 'static + Deref + Send + Sync,
L: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
CMP: 'static + Send + ChannelManagerPersister<Signer, M, T, K, F, L>,
CM: 'static + Deref<Target = ChannelManager<Signer, M, T, K, F, L>> + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L>> + Send + Sync,
>
(handler: CMP, channel_manager: CM, peer_manager: PM, logger: L) -> Self
where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
Expand All @@ -95,7 +132,7 @@ impl BackgroundProcessor {
let updates_available =
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
if updates_available {
persist_channel_manager(&*channel_manager)?;
handler.persist_manager(&*channel_manager)?;
}
// Exit the loop if the background processor was requested to stop.
if stop_thread.load(Ordering::Acquire) == true {
Expand Down