Skip to content

Commit 7c465d6

Browse files
committed
Refactor EventsProvider to take an EventHandler
1 parent 3a0356f commit 7c465d6

File tree

12 files changed

+139
-49
lines changed

12 files changed

+139
-49
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use lightning::util::errors::APIError;
4444
use lightning::util::events;
4545
use lightning::util::logger::Logger;
4646
use lightning::util::config::UserConfig;
47-
use lightning::util::events::{EventsProvider, MessageSendEventsProvider};
47+
use lightning::util::events::MessageSendEventsProvider;
4848
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
4949
use lightning::routing::router::{Route, RouteHop};
5050

fuzz/src/full_stack.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use lightning::ln::msgs::DecodeError;
3939
use lightning::routing::router::get_route;
4040
use lightning::routing::network_graph::NetGraphMsgHandler;
4141
use lightning::util::config::UserConfig;
42-
use lightning::util::events::{EventsProvider,Event};
42+
use lightning::util::events::Event;
4343
use lightning::util::enforcing_trait_impls::EnforcingSigner;
4444
use lightning::util::logger::Logger;
4545
use lightning::util::ser::Readable;

lightning-background-processor/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ mod tests {
173173
use lightning::ln::msgs::ChannelMessageHandler;
174174
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
175175
use lightning::util::config::UserConfig;
176-
use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
176+
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
177177
use lightning::util::logger::Logger;
178178
use lightning::util::ser::Writeable;
179179
use lightning::util::test_utils;

lightning-net-tokio/src/lib.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
//! use tokio::sync::mpsc;
2727
//! use std::net::TcpStream;
2828
//! use bitcoin::secp256k1::key::PublicKey;
29-
//! use lightning::util::events::EventsProvider;
29+
//! use lightning::util::events::{Event, EventHandler, EventsProvider};
3030
//! use std::net::SocketAddr;
3131
//! use std::sync::Arc;
3232
//!
@@ -47,12 +47,12 @@
4747
//! lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await;
4848
//! loop {
4949
//! receiver.recv().await;
50-
//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
51-
//! // Handle the event!
52-
//! }
53-
//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
54-
//! // Handle the event!
55-
//! }
50+
//! channel_manager.process_pending_events(&|event| {
51+
//! // Handle the event!
52+
//! });
53+
//! chain_monitor.process_pending_events(&|event| {
54+
//! // Handle the event!
55+
//! });
5656
//! }
5757
//! }
5858
//!
@@ -62,12 +62,12 @@
6262
//! lightning_net_tokio::setup_inbound(peer_manager, sender, socket);
6363
//! loop {
6464
//! receiver.recv().await;
65-
//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
66-
//! // Handle the event!
67-
//! }
68-
//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
69-
//! // Handle the event!
70-
//! }
65+
//! channel_manager.process_pending_events(&|event| {
66+
//! // Handle the event!
67+
//! });
68+
//! chain_monitor.process_pending_events(&|event| {
69+
//! // Handle the event!
70+
//! });
7171
//! }
7272
//! }
7373
//! ```

lightning/src/chain/chainmonitor.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use chain::transaction::{OutPoint, TransactionData};
3535
use chain::keysinterface::Sign;
3636
use util::logger::Logger;
3737
use util::events;
38-
use util::events::Event;
38+
use util::events::EventHandler;
3939

4040
use std::collections::{HashMap, hash_map};
4141
use std::sync::RwLock;
@@ -139,6 +139,15 @@ where C::Target: chain::Filter,
139139
persister,
140140
}
141141
}
142+
143+
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
144+
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
145+
use util::events::EventsProvider;
146+
let events = std::cell::RefCell::new(Vec::new());
147+
let event_handler = |event| events.borrow_mut().push(event);
148+
self.process_pending_events(&event_handler);
149+
events.into_inner()
150+
}
142151
}
143152

144153
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
@@ -306,12 +315,20 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
306315
L::Target: Logger,
307316
P::Target: channelmonitor::Persist<ChannelSigner>,
308317
{
309-
fn get_and_clear_pending_events(&self) -> Vec<Event> {
318+
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
319+
///
320+
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
321+
/// order to handle these events.
322+
///
323+
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
324+
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
310325
let mut pending_events = Vec::new();
311326
for monitor in self.monitors.read().unwrap().values() {
312327
pending_events.append(&mut monitor.get_and_clear_pending_events());
313328
}
314-
pending_events
329+
for event in pending_events.drain(..) {
330+
handler.handle_event(event);
331+
}
315332
}
316333
}
317334

@@ -320,7 +337,6 @@ mod tests {
320337
use ::{check_added_monitors, get_local_commitment_txn};
321338
use ln::features::InitFeatures;
322339
use ln::functional_test_utils::*;
323-
use util::events::EventsProvider;
324340
use util::events::MessageSendEventsProvider;
325341
use util::test_utils::{OnRegisterOutput, TxOutReference};
326342

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use ln::msgs::{ChannelMessageHandler, ErrorAction, RoutingMessageHandler};
2727
use routing::router::get_route;
2828
use util::config::UserConfig;
2929
use util::enforcing_trait_impls::EnforcingSigner;
30-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
30+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
3131
use util::errors::APIError;
3232
use util::ser::{ReadableArgs, Writeable};
3333

lightning/src/ln/channelmanager.rs

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use ln::onion_utils;
5454
use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField};
5555
use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner};
5656
use util::config::UserConfig;
57-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
57+
use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
5858
use util::{byte_utils, events};
5959
use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
6060
use util::chacha20::{ChaCha20, ChaChaReader};
@@ -1860,6 +1860,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
18601860
/// Note that this includes RBF or similar transaction replacement strategies - lightning does
18611861
/// not currently support replacing a funding transaction on an existing channel. Instead,
18621862
/// create a new channel with a conflicting funding transaction.
1863+
///
1864+
/// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady
18631865
pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> {
18641866
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
18651867

@@ -3449,11 +3451,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
34493451
}
34503452
}
34513453

3452-
/// Process pending events from the `chain::Watch`.
3453-
fn process_pending_monitor_events(&self) {
3454+
/// Process pending events from the `chain::Watch`, returning whether any events were processed.
3455+
fn process_pending_monitor_events(&self) -> bool {
34543456
let mut failed_channels = Vec::new();
3455-
{
3456-
for monitor_event in self.chain_monitor.release_pending_monitor_events() {
3457+
let has_pending_monitor_events = {
3458+
let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
3459+
let has_pending_monitor_events = !pending_monitor_events.is_empty();
3460+
for monitor_event in pending_monitor_events {
34573461
match monitor_event {
34583462
MonitorEvent::HTLCEvent(htlc_update) => {
34593463
if let Some(preimage) = htlc_update.payment_preimage {
@@ -3490,11 +3494,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
34903494
},
34913495
}
34923496
}
3493-
}
3497+
has_pending_monitor_events
3498+
};
34943499

34953500
for failure in failed_channels.drain(..) {
34963501
self.finish_force_close_channel(failure);
34973502
}
3503+
3504+
has_pending_monitor_events
34983505
}
34993506

35003507
/// Check the holding cell in each channel and free any pending HTLCs in them if possible.
@@ -3670,6 +3677,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
36703677
pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result<PaymentSecret, APIError> {
36713678
self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id)
36723679
}
3680+
3681+
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
3682+
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
3683+
let events = std::cell::RefCell::new(Vec::new());
3684+
let event_handler = |event| events.borrow_mut().push(event);
3685+
self.process_pending_events(&event_handler);
3686+
events.into_inner()
3687+
}
36733688
}
36743689

36753690
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
@@ -3694,21 +3709,42 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSend
36943709
}
36953710

36963711
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> EventsProvider for ChannelManager<Signer, M, T, K, F, L>
3697-
where M::Target: chain::Watch<Signer>,
3698-
T::Target: BroadcasterInterface,
3699-
K::Target: KeysInterface<Signer = Signer>,
3700-
F::Target: FeeEstimator,
3701-
L::Target: Logger,
3712+
where
3713+
M::Target: chain::Watch<Signer>,
3714+
T::Target: BroadcasterInterface,
3715+
K::Target: KeysInterface<Signer = Signer>,
3716+
F::Target: FeeEstimator,
3717+
L::Target: Logger,
37023718
{
3703-
fn get_and_clear_pending_events(&self) -> Vec<Event> {
3704-
//TODO: This behavior should be documented. It's non-intuitive that we query
3705-
// ChannelMonitors when clearing other events.
3706-
self.process_pending_monitor_events();
3719+
/// Processes events that must be periodically handled.
3720+
///
3721+
/// An [`EventHandler`] may safely call back to the provider in order to handle an event.
3722+
/// However, it must not call [`Writeable::write`] as doing so would result in a deadlock.
3723+
///
3724+
/// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared
3725+
/// when processed, an [`EventHandler`] must be able to handle previously seen events when
3726+
/// restarting from an old state.
3727+
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
3728+
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
3729+
let mut result = NotifyOption::SkipPersist;
37073730

3708-
let mut ret = Vec::new();
3709-
let mut pending_events = self.pending_events.lock().unwrap();
3710-
mem::swap(&mut ret, &mut *pending_events);
3711-
ret
3731+
// TODO: This behavior should be documented. It's unintuitive that we query
3732+
// ChannelMonitors when clearing other events.
3733+
if self.process_pending_monitor_events() {
3734+
result = NotifyOption::DoPersist;
3735+
}
3736+
3737+
let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
3738+
if !pending_events.is_empty() {
3739+
result = NotifyOption::DoPersist;
3740+
}
3741+
3742+
for event in pending_events.drain(..) {
3743+
handler.handle_event(event);
3744+
}
3745+
3746+
result
3747+
});
37123748
}
37133749
}
37143750

@@ -4956,7 +4992,7 @@ pub mod bench {
49564992
use routing::router::get_route;
49574993
use util::test_utils;
49584994
use util::config::UserConfig;
4959-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
4995+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
49604996

49614997
use bitcoin::hashes::Hash;
49624998
use bitcoin::hashes::sha256::Hash as Sha256;

lightning/src/ln/functional_test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
2323
use util::enforcing_trait_impls::EnforcingSigner;
2424
use util::test_utils;
2525
use util::test_utils::TestChainMonitor;
26-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
26+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
2727
use util::errors::APIError;
2828
use util::config::UserConfig;
2929
use util::ser::{ReadableArgs, Writeable, Readable};

lightning/src/ln/functional_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use ln::msgs;
2929
use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler,HTLCFailChannelUpdate, ErrorAction};
3030
use util::enforcing_trait_impls::EnforcingSigner;
3131
use util::{byte_utils, test_utils};
32-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
32+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
3333
use util::errors::APIError;
3434
use util::ser::{Writeable, ReadableArgs};
3535
use util::config::UserConfig;

lightning/src/ln/onion_route_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use ln::features::{InitFeatures, InvoiceFeatures};
2020
use ln::msgs;
2121
use ln::msgs::{ChannelMessageHandler, HTLCFailChannelUpdate, OptionalField};
2222
use util::test_utils;
23-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
23+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
2424
use util::ser::{Writeable, Writer};
2525
use util::config::UserConfig;
2626

lightning/src/ln/reorg_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs};
1515
use ln::features::InitFeatures;
1616
use ln::msgs::{ChannelMessageHandler, ErrorAction, HTLCFailChannelUpdate};
1717
use util::enforcing_trait_impls::EnforcingSigner;
18-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
18+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
1919
use util::test_utils;
2020
use util::ser::{ReadableArgs, Writeable};
2121

lightning/src/util/events.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use bitcoin::blockdata::script::Script;
2424
use bitcoin::secp256k1::key::PublicKey;
2525

2626
use core::time::Duration;
27+
use std::ops::Deref;
2728

2829
/// An Event which you should probably take some action in response to.
2930
///
@@ -376,9 +377,46 @@ pub trait MessageSendEventsProvider {
376377
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
377378
}
378379

379-
/// A trait indicating an object may generate events
380+
/// A trait indicating an object may generate events.
381+
///
382+
/// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].
383+
///
384+
/// # Requirements
385+
///
386+
/// See [`process_pending_events`] for requirements around event processing.
387+
///
388+
/// When using this trait, [`process_pending_events`] will call [`handle_event`] for each pending
389+
/// event since the last invocation. The handler must either act upon the event immediately
390+
/// or preserve it for later handling.
391+
///
392+
/// Note, handlers may call back into the provider and thus deadlocking must be avoided. Be sure to
393+
/// consult the provider's documentation on the implication of processing events and how a handler
394+
/// may safely use the provider (e.g., see [`ChannelManager::process_pending_events`] and
395+
/// [`ChainMonitor::process_pending_events`]).
396+
///
397+
/// [`process_pending_events`]: Self::process_pending_events
398+
/// [`handle_event`]: EventHandler::handle_event
399+
/// [`ChannelManager::process_pending_events`]: crate::ln::channelmanager::ChannelManager#method.process_pending_events
400+
/// [`ChainMonitor::process_pending_events`]: crate::chain::chainmonitor::ChainMonitor#method.process_pending_events
380401
pub trait EventsProvider {
381-
/// Gets the list of pending events which were generated by previous actions, clearing the list
382-
/// in the process.
383-
fn get_and_clear_pending_events(&self) -> Vec<Event>;
402+
/// Processes any events generated since the last call using the given event handler.
403+
///
404+
/// Subsequent calls must only process new events. However, handlers must be capable of handling
405+
/// duplicate events across process restarts. This may occur if the provider was recovered from
406+
/// an old state (i.e., it hadn't been successfully persisted after processing pending events).
407+
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
408+
}
409+
410+
/// A trait implemented for objects handling events from [`EventsProvider`].
411+
pub trait EventHandler {
412+
/// Handles the given [`Event`].
413+
///
414+
/// See [`EventsProvider`] for details that must be considered when implementing this method.
415+
fn handle_event(&self, event: Event);
416+
}
417+
418+
impl<F> EventHandler for F where F: Fn(Event) {
419+
fn handle_event(&self, event: Event) {
420+
self(event)
421+
}
384422
}

0 commit comments

Comments
 (0)