Skip to content

Anchor-outputs (2/3): Add anchors and support Commitment CPFP bumping #725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
10 changes: 7 additions & 3 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::ChainMonitor;
use lightning::chain::channelmonitor;
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::chain::utxointerface::UtxoPool;
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor};
Expand Down Expand Up @@ -111,12 +112,13 @@ impl BackgroundProcessor {
F: 'static + Deref + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
U: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send + Sync,
CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P, U>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L>> + Send + Sync,
>
Expand All @@ -128,6 +130,7 @@ impl BackgroundProcessor {
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
U::Target: 'static + UtxoPool,
P::Target: 'static + channelmonitor::Persist<Signer>,
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
Expand Down Expand Up @@ -207,7 +210,7 @@ mod tests {
fn disconnect_socket(&mut self) {}
}

type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>, Arc<test_utils::TestPool>>;

struct Node {
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
Expand Down Expand Up @@ -241,13 +244,14 @@ mod tests {
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
let utxo_pool = Arc::new(test_utils::TestPool::new());
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
let seed = [i as u8; 32];
let network = Network::Testnet;
let now = Duration::from_secs(genesis_block(network).header.time as u64);
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone(), utxo_pool.clone()));
let best_block = BestBlock::from_genesis(network);
let params = ChainParameters { network, best_block };
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
Expand Down
4 changes: 3 additions & 1 deletion lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// use lightning::chain::chaininterface::FeeEstimator;
/// use lightning::chain::keysinterface;
/// use lightning::chain::keysinterface::KeysInterface;
/// use lightning::chain::utxointerface::UtxoPool;
/// use lightning::ln::channelmanager::ChannelManager;
/// use lightning::ln::channelmanager::ChannelManagerReadArgs;
/// use lightning::util::config::UserConfig;
Expand All @@ -64,11 +65,12 @@ BlockSourceResult<ValidatedBlockHeader> {
/// T: BroadcasterInterface,
/// F: FeeEstimator,
/// L: Logger,
/// U: UtxoPool,
/// C: chain::Filter,
/// P: channelmonitor::Persist<S>,
/// >(
/// block_source: &mut B,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P, &U>,
/// config: UserConfig,
/// keys_manager: &K,
/// tx_broadcaster: &T,
Expand Down
3 changes: 2 additions & 1 deletion lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
//! type ChainAccess = dyn lightning::chain::Access + Send + Sync;
//! type ChainFilter = dyn lightning::chain::Filter + Send + Sync;
//! type DataPersister = dyn lightning::chain::channelmonitor::Persist<lightning::chain::keysinterface::InMemorySigner> + Send + Sync;
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
//! type UtxoPool = dyn lightning::chain::utxointerface::UtxoPool + Send + Sync;
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>, Arc<UtxoPool>>;
//! type ChannelManager = Arc<lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>>;
//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>>;
//!
Expand Down
4 changes: 2 additions & 2 deletions lightning-persister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ mod tests {
let persister_1 = FilesystemPersister::new("test_filesystem_persister_1".to_string());
let chanmon_cfgs = create_chanmon_cfgs(2);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &persister_0, &node_cfgs[0].keys_manager);
let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &persister_1, &node_cfgs[1].keys_manager);
let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &persister_0, &node_cfgs[0].keys_manager, &chanmon_cfgs[0].utxo_pool);
let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &persister_1, &node_cfgs[1].keys_manager, &chanmon_cfgs[0].utxo_pool);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
Expand Down
49 changes: 31 additions & 18 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use chain::channelmonitor;
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist, TransactionOutputs};
use chain::transaction::{OutPoint, TransactionData};
use chain::keysinterface::Sign;
use chain::utxointerface::UtxoPool;
use util::logger::Logger;
use util::events;
use util::events::EventHandler;
Expand All @@ -51,12 +52,13 @@ use core::ops::Deref;
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
/// [module-level documentation]: crate::chain::chainmonitor
pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool,
{
/// The monitors
pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
Expand All @@ -65,14 +67,16 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
logger: L,
fee_estimator: F,
persister: P,
utxo_pool: U,
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool
{
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
/// of a channel and reacting accordingly based on transactions in the given chain data. See
Expand Down Expand Up @@ -130,14 +134,15 @@ where C::Target: chain::Filter,
/// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
/// always need to fetch full blocks absent another means for determining which blocks contain
/// transactions relevant to the watched channels.
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P, utxo_pool: U) -> Self {
Self {
monitors: RwLock::new(HashMap::new()),
chain_source,
broadcaster,
logger,
fee_estimator: feeest,
persister,
utxo_pool
}
}

Expand All @@ -151,53 +156,55 @@ where C::Target: chain::Filter,
}
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref>
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool
{
fn block_connected(&self, block: &Block, height: u32) {
let header = &block.header;
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
self.process_chain_data(header, &txdata, |monitor, txdata| {
monitor.block_connected(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool)
});
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
monitor.block_disconnected(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool);
}
}
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref>
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool,
{
fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
self.process_chain_data(header, txdata, |monitor, txdata| {
monitor.transactions_confirmed(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool)
});
}

fn transaction_unconfirmed(&self, txid: &Txid) {
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool);
}
}

Expand All @@ -207,7 +214,7 @@ where
// it's still possible if a chain::Filter implementation returns a transaction.
debug_assert!(txdata.is_empty());
monitor.best_block_updated(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool)
});
}

Expand All @@ -224,13 +231,14 @@ where
}
}

impl<ChannelSigner: Sign, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref, U: Deref>
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool,
{
/// Adds the monitor that watches the channel referred to by the given outpoint.
///
Expand Down Expand Up @@ -281,7 +289,7 @@ where C::Target: chain::Filter,
},
Some(monitor) => {
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger, &self.utxo_pool);
if let Err(e) = &update_res {
log_error!(self.logger, "Failed to update channel monitor: {:?}", e);
}
Expand Down Expand Up @@ -309,12 +317,13 @@ where C::Target: chain::Filter,
}
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool
{
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
///
Expand Down Expand Up @@ -366,10 +375,14 @@ mod tests {
};

// Set expectations on nodes[1]'s chain source to return dependent transactions.
let htlc_output = TxOutReference(commitment_tx.clone(), 0);
let to_local_output = TxOutReference(commitment_tx.clone(), 1);
let anchor_a_output = TxOutReference(commitment_tx.clone(), 0);
let anchor_b_output = TxOutReference(commitment_tx.clone(), 1);
let htlc_output = TxOutReference(commitment_tx.clone(), 2);
let to_local_output = TxOutReference(commitment_tx.clone(), 3);
let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0);
nodes[1].chain_source
.expect(OnRegisterOutput { with: anchor_a_output, returns: None })
.expect(OnRegisterOutput { with: anchor_b_output, returns: None })
.expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) })
.expect(OnRegisterOutput { with: to_local_output, returns: None })
.expect(OnRegisterOutput { with: htlc_timeout_output, returns: None });
Expand Down
Loading