Skip to content

Rescan dependent transactions in ChainMonitor #840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
use bitcoin::blockdata::block::{Block, BlockHeader};

use chain;
use chain::Filter;
use chain::{Filter, WatchedOutput};
use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use chain::channelmonitor;
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist};
Expand Down Expand Up @@ -82,18 +82,40 @@ where C::Target: chain::Filter,
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
/// updated `txdata`.
pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
let mut dependent_txdata = Vec::new();
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);

// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.iter() {
chain_source.register_output(&OutPoint { txid, index: *idx as u16 }, &output.script_pubkey);
// Register any new outputs with the chain source for filtering and recurse
// if it indicates that there are dependent transactions within the block
// that had not been previously included in txdata.
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: *idx as u16 },
script_pubkey: output.script_pubkey.clone(),
};
if let Some(tx) = chain_source.register_output(output) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where I've a doubt about the operations of an Electrum client. At this point, if you don't have locally the in-block descendants what should you do ?

  • a) the local Electrum client should emit new queries to its Electrum servers and return None ?
  • b) the local Electrum client when learning about the spent of a register output should immediately subscribe spent outputs, and thus learn descendants asynchronously from register_output caller ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Electrum client needs to query the server for dependent transactions synchronously and block until it gets a response back. Otherwise, the user would need to call block_connected again for the same header, which we'd like to avoid.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, option c) emit new queries and block until it gets a response back. I think this query-and-block requirement should be documented around chain::Filter::register_output ?

Also, I think documentation should note that query to a trusted-but-not-too-much server should be on a time-out, otherwise that's a vector to stale operations of an Electrum client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... currently the return value is Option<Transaction>, so there's no mechanism to timeout. What do you propose should happen in that scenario?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to return a None, but I think the timeout should be implemented by the Electrum local client, not by us. We should just make this point clear in chainFilter implementation documentation imo.

I think <30s is enough to consider the output spent isn't known by the Electrum server and that we should return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that is shouldn't be us. But returning None doesn't allow us to differentiate between "Electrum doesn't know" and "Electrum is unresponsive". What should the user do in case of the latter?

dependent_txdata.push(tx);
}
}
}
}
}

// Recursively call for any dependent transactions that were identified by the chain source.
if !dependent_txdata.is_empty() {
dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
dependent_txdata.dedup_by_key(|(index, _tx)| *index);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, ChannelMonitor::block_connected will never return a registration request for outputs spent by transactions it has just been called to parse ?

From where duplication could have been introduced ? Though I agree to keep it as belt-and-suspender.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, ChannelMonitor::block_connected will never return a registration request for outputs spent by transactions it has just been called to parse ?

I'm not sure I follow. Could you explain why it will not do so? If fact, doesn't my test demonstrate that it does? (i.e., a commitment transaction is processed and its outputs are registered)

From where duplication could have been introduced ? Though I agree to keep it as belt-and-suspender.

The duplication may occur across channel monitors. For instance, two channel monitors register two different outputs which happen to be spent by the same in-block descendent transaction. So each call to register_output should return this transaction, which would need to be de-duped.

Would this ever happen in practice? Maybe not? But it is not an impossible scenario, right?

Copy link

@ariard ariard Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fact, doesn't my test demonstrate that it does? (i.e., a commitment transaction is processed and its outputs are registered)

I meant a channel monitor won't ask to register outputs spent by the processed commitment transactions, only outputs for the next stage of spending ? Those outputs spent are already registered, otherwise we won't have received a transaction spending them.

Would this ever happen in practice? Maybe not? But it is not an impossible scenario, right?

I don't believe this can happen right now because we don't coalesce justice transactions across channels. Although thought about doing it for fee optimizations a while back but not worthy the complexity.

Beyond, I'm also pretty sure that dual-funding/splicing will enable that kind of scenario where channel monitors register different outputs which happen to be spent by the same in-block descendent transaction. A splicing transaction could lawfully spend two funding outpoints.

AFAICT, I think those scenarios aren't possible for now, but really likely in the future so better to keep the dedup ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant a channel monitor won't ask to register outputs spent by the processed commitment transactions, only outputs for the next stage of spending ? Those outputs spent are already registered, otherwise we won't have received a transaction spending them.

Ah, right. Sorry, I misread your message.

I don't believe this can happen right now because we don't coalesce justice transactions across channels. Although thought about doing it for fee optimizations a while back but not worthy the complexity.

Beyond, I'm also pretty sure that dual-funding/splicing will enable that kind of scenario where channel monitors register different outputs which happen to be spent by the same in-block descendent transaction. A splicing transaction could lawfully spend two funding outpoints.

AFAICT, I think those scenarios aren't possible for now, but really likely in the future so better to keep the dedup ?

Gotcha, will keep as is. Though note that these transactions are published by the counterparty, which may be another implementation.

For a de-dup test, I suppose if I set up two channels then I may be able to form a transaction that spends from both commitment transaction's outputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a de-dup test, I suppose if I set up two channels then I may be able to form a transaction that spends from both commitment transaction's outputs.

Actually, I think one channel would be sufficient here. A transaction spending the commitment transaction's to_local output and the HTLC transaction's output should suffice. The Electrum client would return this twice (once for each call to register_output), which would trigger the de-dup logic.

Is there an easy way to form such a transaction using our API?

let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
self.block_connected(header, &txdata, height);
}
}

/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
Expand Down Expand Up @@ -245,3 +267,56 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
pending_events
}
}

#[cfg(test)]
mod tests {
use ::{check_added_monitors, get_local_commitment_txn};
use ln::features::InitFeatures;
use ln::functional_test_utils::*;
use util::events::EventsProvider;
use util::events::MessageSendEventsProvider;
use util::test_utils::{OnRegisterOutput, TxOutReference};

/// Tests that in-block dependent transactions are processed by `block_connected` when not
/// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance,
/// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the
/// commitment transaction itself. An Electrum client may filter the commitment transaction but
/// needs to return the HTLC transaction so it can be processed.
#[test]
fn connect_block_checks_dependent_transactions() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let channel = create_announced_chan_between_nodes(
&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());

// Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions.
let (commitment_tx, htlc_tx) = {
let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0;
let mut txn = get_local_commitment_txn!(nodes[0], channel.2);
claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage, 5_000_000);

assert_eq!(txn.len(), 2);
(txn.remove(0), txn.remove(0))
};

// Set expectations on nodes[1]'s chain source to return dependent transactions.
let htlc_output = TxOutReference(commitment_tx.clone(), 0);
let to_local_output = TxOutReference(commitment_tx.clone(), 1);
let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0);
nodes[1].chain_source
.expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) })
.expect(OnRegisterOutput { with: to_local_output, returns: None })
.expect(OnRegisterOutput { with: htlc_timeout_output, returns: None });

// Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain
// source should return the dependent HTLC transaction when the HTLC output is registered.
mine_transaction(&nodes[1], &commitment_tx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked that "expectations" are registered and then cleared before and after this tx is mined


// Clean up so uninteresting assertions don't fail.
check_added_monitors!(nodes[1], 1);
nodes[1].node.get_and_clear_pending_msg_events();
nodes[1].node.get_and_clear_pending_events();
}
}
7 changes: 6 additions & 1 deletion lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLC
use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
use chain;
use chain::WatchedOutput;
use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use chain::transaction::{OutPoint, TransactionData};
use chain::keysinterface::{SpendableOutputDescriptor, StaticPaymentOutputDescriptor, DelayedPaymentOutputDescriptor, Sign, KeysInterface};
Expand Down Expand Up @@ -1174,7 +1175,11 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
for (txid, outputs) in lock.get_outputs_to_watch().iter() {
for (index, script_pubkey) in outputs.iter() {
assert!(*index <= u16::max_value() as u32);
filter.register_output(&OutPoint { txid: *txid, index: *index as u16 }, script_pubkey);
filter.register_output(WatchedOutput {
block_hash: None,
outpoint: OutPoint { txid: *txid, index: *index as u16 },
script_pubkey: script_pubkey.clone(),
});
}
}
}
Expand Down
37 changes: 33 additions & 4 deletions lightning/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::script::Script;
use bitcoin::blockdata::transaction::TxOut;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
use bitcoin::hash_types::{BlockHash, Txid};

use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent};
Expand Down Expand Up @@ -129,9 +129,38 @@ pub trait Filter: Send + Sync {
/// a spending condition.
fn register_tx(&self, txid: &Txid, script_pubkey: &Script);

/// Registers interest in spends of a transaction output identified by `outpoint` having
/// `script_pubkey` as the spending condition.
fn register_output(&self, outpoint: &OutPoint, script_pubkey: &Script);
/// Registers interest in spends of a transaction output.
///
/// Optionally, when `output.block_hash` is set, should return any transaction spending the
/// output that is found in the corresponding block along with its index.
///
/// This return value is useful for Electrum clients in order to supply in-block descendant
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how the Electrum client is leveraging block hash presence. Do you assume it stores the in-block descendant, even before register_output caller ask for them ?

Otherwise, I don't think you need block hash to register already-confirmed scriptpubkeys to an Electrum server API, see https://electrumx.readthedocs.io/en/latest/protocol-methods.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the block_hash is to limit transactions to a given block. Essentially:

  • user calls block_connected for block X with a subset of txdata supplied by Electrum
  • register_output indicates there is a new output in txdata to watch
  • Electrum client needs to determine if that output was spent in block X

It should return None if not spent in block X even if spent in some later block Electrum knows about.

/// transactions which otherwise were not included. This is not necessary for other clients if
/// such descendant transactions were already included (e.g., when a BIP 157 client provides the
/// full block).
fn register_output(&self, output: WatchedOutput) -> Option<(usize, Transaction)>;
}

/// A transaction output watched by a [`ChannelMonitor`] for spends on-chain.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the persistent confusion all of us seem to have, would be good if this mentioned "spends" more prominently, or maybe explicitly - "Indicates any transaction spending the outpoint described here..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is a bit vague. Rewrote to be more explicit on how this is used.

///
/// Used to convey to a [`Filter`] such an output with a given spending condition. Any transaction
/// spending the output must be given to [`ChannelMonitor::block_connected`] either directly or via
/// the return value of [`Filter::register_output`].
///
/// If `block_hash` is `Some`, this indicates the output was created in the corresponding block and
/// may have been spent there. See [`Filter::register_output`] for details.
///
/// [`ChannelMonitor`]: channelmonitor::ChannelMonitor
/// [`ChannelMonitor::block_connected`]: channelmonitor::ChannelMonitor::block_connected
pub struct WatchedOutput {
/// First block where the transaction output may have been spent.
pub block_hash: Option<BlockHash>,

/// Outpoint identifying the transaction output.
pub outpoint: OutPoint,

/// Spending condition of the transaction output.
pub script_pubkey: Script,
}

impl<T: Listen> Listen for std::ops::Deref<Target = T> {
Expand Down
3 changes: 2 additions & 1 deletion lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ macro_rules! get_feerate {
}
}

#[cfg(test)]
/// Returns any local commitment transactions for the channel.
#[macro_export]
macro_rules! get_local_commitment_txn {
($node: expr, $channel_id: expr) => {
{
Expand Down
82 changes: 79 additions & 3 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// licenses.

use chain;
use chain::WatchedOutput;
use chain::chaininterface;
use chain::chaininterface::ConfirmationTarget;
use chain::chainmonitor;
Expand Down Expand Up @@ -38,7 +39,7 @@ use std::time::Duration;
use std::sync::{Mutex, Arc};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp, mem};
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use chain::keysinterface::InMemorySigner;

pub struct TestVecWriter(pub Vec<u8>);
Expand Down Expand Up @@ -517,6 +518,7 @@ pub struct TestChainSource {
pub utxo_ret: Mutex<Result<TxOut, chain::AccessError>>,
pub watched_txn: Mutex<HashSet<(Txid, Script)>>,
pub watched_outputs: Mutex<HashSet<(OutPoint, Script)>>,
expectations: Mutex<Option<VecDeque<OnRegisterOutput>>>,
}

impl TestChainSource {
Expand All @@ -527,8 +529,17 @@ impl TestChainSource {
utxo_ret: Mutex::new(Ok(TxOut { value: u64::max_value(), script_pubkey })),
watched_txn: Mutex::new(HashSet::new()),
watched_outputs: Mutex::new(HashSet::new()),
expectations: Mutex::new(None),
}
}

/// Sets an expectation that [`chain::Filter::register_output`] is called.
pub fn expect(&self, expectation: OnRegisterOutput) -> &Self {
self.expectations.lock().unwrap()
.get_or_insert_with(|| VecDeque::new())
.push_back(expectation);
self
}
}

impl chain::Access for TestChainSource {
Expand All @@ -546,7 +557,72 @@ impl chain::Filter for TestChainSource {
self.watched_txn.lock().unwrap().insert((*txid, script_pubkey.clone()));
}

fn register_output(&self, outpoint: &OutPoint, script_pubkey: &Script) {
self.watched_outputs.lock().unwrap().insert((*outpoint, script_pubkey.clone()));
fn register_output(&self, output: WatchedOutput) -> Option<(usize, Transaction)> {
let dependent_tx = match &mut *self.expectations.lock().unwrap() {
None => None,
Some(expectations) => match expectations.pop_front() {
None => {
panic!("Unexpected register_output: {:?}",
(output.outpoint, output.script_pubkey));
},
Some(expectation) => {
assert_eq!(output.outpoint, expectation.outpoint());
assert_eq!(&output.script_pubkey, expectation.script_pubkey());
expectation.returns
},
},
};

self.watched_outputs.lock().unwrap().insert((output.outpoint, output.script_pubkey));
dependent_tx
}
}

impl Drop for TestChainSource {
fn drop(&mut self) {
if std::thread::panicking() {
return;
}

if let Some(expectations) = &*self.expectations.lock().unwrap() {
if !expectations.is_empty() {
panic!("Unsatisfied expectations: {:?}", expectations);
}
}
}
}

/// An expectation that [`chain::Filter::register_output`] was called with a transaction output and
/// returns an optional dependent transaction that spends the output in the same block.
pub struct OnRegisterOutput {
/// The transaction output to register.
pub with: TxOutReference,

/// A dependent transaction spending the output along with its position in the block.
pub returns: Option<(usize, Transaction)>,
}

/// A transaction output as identified by an index into a transaction's output list.
pub struct TxOutReference(pub Transaction, pub usize);

impl OnRegisterOutput {
fn outpoint(&self) -> OutPoint {
let txid = self.with.0.txid();
let index = self.with.1 as u16;
OutPoint { txid, index }
}

fn script_pubkey(&self) -> &Script {
let index = self.with.1;
&self.with.0.output[index].script_pubkey
}
}

impl std::fmt::Debug for OnRegisterOutput {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OnRegisterOutput")
.field("outpoint", &self.outpoint())
.field("script_pubkey", self.script_pubkey())
.finish()
}
}