Skip to content

Commit 2f4457f

Browse files
committed
Drop return value from Filter::register_output
This commit removes the return value from `Filter::register_output` as creating a suitable value almost always entails blocking operations (e.g., lookups via network request), which however conflicts with the requirement that user calls should avoid blocking calls at all cost. Removing the return value also rendered quite a bit of test code for dependent transaction handling superfluous, which is therefore also removed with this commit.
1 parent d2191d9 commit 2f4457f

File tree

3 files changed

+59
-188
lines changed

3 files changed

+59
-188
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 51 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -263,82 +263,67 @@ where C::Target: chain::Filter,
263263
where
264264
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
265265
{
266-
let mut dependent_txdata = Vec::new();
267-
{
268-
let monitor_states = self.monitors.write().unwrap();
269-
if let Some(height) = best_height {
270-
// If the best block height is being updated, update highest_chain_height under the
271-
// monitors write lock.
272-
let old_height = self.highest_chain_height.load(Ordering::Acquire);
273-
let new_height = height as usize;
274-
if new_height > old_height {
275-
self.highest_chain_height.store(new_height, Ordering::Release);
276-
}
266+
let monitor_states = self.monitors.write().unwrap();
267+
if let Some(height) = best_height {
268+
// If the best block height is being updated, update highest_chain_height under the
269+
// monitors write lock.
270+
let old_height = self.highest_chain_height.load(Ordering::Acquire);
271+
let new_height = height as usize;
272+
if new_height > old_height {
273+
self.highest_chain_height.store(new_height, Ordering::Release);
277274
}
275+
}
278276

279-
for (funding_outpoint, monitor_state) in monitor_states.iter() {
280-
let monitor = &monitor_state.monitor;
281-
let mut txn_outputs;
282-
{
283-
txn_outputs = process(monitor, txdata);
284-
let update_id = MonitorUpdateId {
285-
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
286-
};
287-
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
288-
if let Some(height) = best_height {
289-
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
290-
// If there are not ChainSync persists awaiting completion, go ahead and
291-
// set last_chain_persist_height here - we wouldn't want the first
292-
// TemporaryFailure to always immediately be considered "overly delayed".
293-
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
294-
}
277+
for (funding_outpoint, monitor_state) in monitor_states.iter() {
278+
let monitor = &monitor_state.monitor;
279+
let mut txn_outputs;
280+
{
281+
txn_outputs = process(monitor, txdata);
282+
let update_id = MonitorUpdateId {
283+
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
284+
};
285+
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
286+
if let Some(height) = best_height {
287+
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
288+
// If there are not ChainSync persists awaiting completion, go ahead and
289+
// set last_chain_persist_height here - we wouldn't want the first
290+
// TemporaryFailure to always immediately be considered "overly delayed".
291+
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
295292
}
293+
}
296294

297-
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
298-
match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
299-
Ok(()) =>
300-
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
301-
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
302-
monitor_state.channel_perm_failed.store(true, Ordering::Release);
303-
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
304-
},
305-
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
306-
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
307-
pending_monitor_updates.push(update_id);
308-
},
309-
}
295+
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
296+
match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
297+
Ok(()) =>
298+
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
299+
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
300+
monitor_state.channel_perm_failed.store(true, Ordering::Release);
301+
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
302+
},
303+
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
304+
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
305+
pending_monitor_updates.push(update_id);
306+
},
310307
}
308+
}
311309

312-
// Register any new outputs with the chain source for filtering, storing any dependent
313-
// transactions from within the block that previously had not been included in txdata.
314-
if let Some(ref chain_source) = self.chain_source {
315-
let block_hash = header.block_hash();
316-
for (txid, mut outputs) in txn_outputs.drain(..) {
317-
for (idx, output) in outputs.drain(..) {
318-
// Register any new outputs with the chain source for filtering and recurse
319-
// if it indicates that there are dependent transactions within the block
320-
// that had not been previously included in txdata.
321-
let output = WatchedOutput {
322-
block_hash: Some(block_hash),
323-
outpoint: OutPoint { txid, index: idx as u16 },
324-
script_pubkey: output.script_pubkey,
325-
};
326-
if let Some(tx) = chain_source.register_output(output) {
327-
dependent_txdata.push(tx);
328-
}
329-
}
310+
// Register any new outputs with the chain source for filtering, storing any dependent
311+
// transactions from within the block that previously had not been included in txdata.
312+
if let Some(ref chain_source) = self.chain_source {
313+
let block_hash = header.block_hash();
314+
for (txid, mut outputs) in txn_outputs.drain(..) {
315+
for (idx, output) in outputs.drain(..) {
316+
// Register any new outputs with the chain source for filtering
317+
let output = WatchedOutput {
318+
block_hash: Some(block_hash),
319+
outpoint: OutPoint { txid, index: idx as u16 },
320+
script_pubkey: output.script_pubkey,
321+
};
322+
chain_source.register_output(output)
330323
}
331324
}
332325
}
333326
}
334-
335-
// Recursively call for any dependent transactions that were identified by the chain source.
336-
if !dependent_txdata.is_empty() {
337-
dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
338-
dependent_txdata.dedup_by_key(|(index, _tx)| *index);
339-
let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
340-
self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around
341-
}
342327
}
343328

344329
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
@@ -745,50 +730,6 @@ mod tests {
745730
use ln::msgs::ChannelMessageHandler;
746731
use util::errors::APIError;
747732
use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
748-
use util::test_utils::{OnRegisterOutput, TxOutReference};
749-
750-
/// Tests that in-block dependent transactions are processed by `block_connected` when not
751-
/// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance,
752-
/// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the
753-
/// commitment transaction itself. An Electrum client may filter the commitment transaction but
754-
/// needs to return the HTLC transaction so it can be processed.
755-
#[test]
756-
fn connect_block_checks_dependent_transactions() {
757-
let chanmon_cfgs = create_chanmon_cfgs(2);
758-
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
759-
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
760-
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
761-
let channel = create_announced_chan_between_nodes(
762-
&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
763-
764-
// Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions.
765-
let (commitment_tx, htlc_tx) = {
766-
let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0;
767-
let mut txn = get_local_commitment_txn!(nodes[0], channel.2);
768-
claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage);
769-
770-
assert_eq!(txn.len(), 2);
771-
(txn.remove(0), txn.remove(0))
772-
};
773-
774-
// Set expectations on nodes[1]'s chain source to return dependent transactions.
775-
let htlc_output = TxOutReference(commitment_tx.clone(), 0);
776-
let to_local_output = TxOutReference(commitment_tx.clone(), 1);
777-
let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0);
778-
nodes[1].chain_source
779-
.expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) })
780-
.expect(OnRegisterOutput { with: to_local_output, returns: None })
781-
.expect(OnRegisterOutput { with: htlc_timeout_output, returns: None });
782-
783-
// Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain
784-
// source should return the dependent HTLC transaction when the HTLC output is registered.
785-
mine_transaction(&nodes[1], &commitment_tx);
786-
787-
// Clean up so uninteresting assertions don't fail.
788-
check_added_monitors!(nodes[1], 1);
789-
nodes[1].node.get_and_clear_pending_msg_events();
790-
nodes[1].node.get_and_clear_pending_events();
791-
}
792733

793734
#[test]
794735
fn test_async_ooo_offchain_updates() {

lightning/src/chain/mod.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
use bitcoin::blockdata::block::{Block, BlockHeader};
1313
use bitcoin::blockdata::constants::genesis_block;
1414
use bitcoin::blockdata::script::Script;
15-
use bitcoin::blockdata::transaction::{Transaction, TxOut};
15+
use bitcoin::blockdata::transaction::TxOut;
1616
use bitcoin::hash_types::{BlockHash, Txid};
1717
use bitcoin::network::constants::Network;
1818
use bitcoin::secp256k1::PublicKey;
@@ -333,21 +333,18 @@ pub trait Filter {
333333

334334
/// Registers interest in spends of a transaction output.
335335
///
336-
/// Optionally, when `output.block_hash` is set, should return any transaction spending the
337-
/// output that is found in the corresponding block along with its index.
338-
///
339-
/// This return value is useful for Electrum clients in order to supply in-block descendant
340-
/// transactions which otherwise were not included. This is not necessary for other clients if
341-
/// such descendant transactions were already included (e.g., when a BIP 157 client provides the
342-
/// full block).
343-
fn register_output(&self, output: WatchedOutput) -> Option<(usize, Transaction)>;
336+
/// Note that this method might be called during processing of a new block. You therefore need
337+
/// to ensure that also dependent output spents within an already connected block are correctly
338+
/// handled, e.g., by re-scanning the block in question whenever new outputs have been
339+
/// registered mid-processing.
340+
fn register_output(&self, output: WatchedOutput);
344341
}
345342

346343
/// A transaction output watched by a [`ChannelMonitor`] for spends on-chain.
347344
///
348345
/// Used to convey to a [`Filter`] such an output with a given spending condition. Any transaction
349346
/// spending the output must be given to [`ChannelMonitor::block_connected`] either directly or via
350-
/// the return value of [`Filter::register_output`].
347+
/// [`Confirm::transactions_confirmed`].
351348
///
352349
/// If `block_hash` is `Some`, this indicates the output was created in the corresponding block and
353350
/// may have been spent there. See [`Filter::register_output`] for details.

lightning/src/util/test_utils.rs

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,6 @@ pub struct TestChainSource {
728728
pub utxo_ret: Mutex<Result<TxOut, chain::AccessError>>,
729729
pub watched_txn: Mutex<HashSet<(Txid, Script)>>,
730730
pub watched_outputs: Mutex<HashSet<(OutPoint, Script)>>,
731-
expectations: Mutex<Option<VecDeque<OnRegisterOutput>>>,
732731
}
733732

734733
impl TestChainSource {
@@ -739,17 +738,8 @@ impl TestChainSource {
739738
utxo_ret: Mutex::new(Ok(TxOut { value: u64::max_value(), script_pubkey })),
740739
watched_txn: Mutex::new(HashSet::new()),
741740
watched_outputs: Mutex::new(HashSet::new()),
742-
expectations: Mutex::new(None),
743741
}
744742
}
745-
746-
/// Sets an expectation that [`chain::Filter::register_output`] is called.
747-
pub fn expect(&self, expectation: OnRegisterOutput) -> &Self {
748-
self.expectations.lock().unwrap()
749-
.get_or_insert_with(|| VecDeque::new())
750-
.push_back(expectation);
751-
self
752-
}
753743
}
754744

755745
impl chain::Access for TestChainSource {
@@ -767,24 +757,8 @@ impl chain::Filter for TestChainSource {
767757
self.watched_txn.lock().unwrap().insert((*txid, script_pubkey.clone()));
768758
}
769759

770-
fn register_output(&self, output: WatchedOutput) -> Option<(usize, Transaction)> {
771-
let dependent_tx = match &mut *self.expectations.lock().unwrap() {
772-
None => None,
773-
Some(expectations) => match expectations.pop_front() {
774-
None => {
775-
panic!("Unexpected register_output: {:?}",
776-
(output.outpoint, output.script_pubkey));
777-
},
778-
Some(expectation) => {
779-
assert_eq!(output.outpoint, expectation.outpoint());
780-
assert_eq!(&output.script_pubkey, expectation.script_pubkey());
781-
expectation.returns
782-
},
783-
},
784-
};
785-
760+
fn register_output(&self, output: WatchedOutput) {
786761
self.watched_outputs.lock().unwrap().insert((output.outpoint, output.script_pubkey));
787-
dependent_tx
788762
}
789763
}
790764

@@ -793,47 +767,6 @@ impl Drop for TestChainSource {
793767
if panicking() {
794768
return;
795769
}
796-
797-
if let Some(expectations) = &*self.expectations.lock().unwrap() {
798-
if !expectations.is_empty() {
799-
panic!("Unsatisfied expectations: {:?}", expectations);
800-
}
801-
}
802-
}
803-
}
804-
805-
/// An expectation that [`chain::Filter::register_output`] was called with a transaction output and
806-
/// returns an optional dependent transaction that spends the output in the same block.
807-
pub struct OnRegisterOutput {
808-
/// The transaction output to register.
809-
pub with: TxOutReference,
810-
811-
/// A dependent transaction spending the output along with its position in the block.
812-
pub returns: Option<(usize, Transaction)>,
813-
}
814-
815-
/// A transaction output as identified by an index into a transaction's output list.
816-
pub struct TxOutReference(pub Transaction, pub usize);
817-
818-
impl OnRegisterOutput {
819-
fn outpoint(&self) -> OutPoint {
820-
let txid = self.with.0.txid();
821-
let index = self.with.1 as u16;
822-
OutPoint { txid, index }
823-
}
824-
825-
fn script_pubkey(&self) -> &Script {
826-
let index = self.with.1;
827-
&self.with.0.output[index].script_pubkey
828-
}
829-
}
830-
831-
impl core::fmt::Debug for OnRegisterOutput {
832-
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
833-
f.debug_struct("OnRegisterOutput")
834-
.field("outpoint", &self.outpoint())
835-
.field("script_pubkey", self.script_pubkey())
836-
.finish()
837770
}
838771
}
839772

0 commit comments

Comments
 (0)