Skip to content

Commit e3524d1

Browse files
Update ChannelManager's ChannelMonitor Arc to be a Deref
Additional changes: * Update fuzz crate to match ChannelManager's new API * Update lightning-net-tokio library to match ChannelManager's new ChannelMonitor Deref API * Update tests to match ChannelManager's new ChannelMonitor Deref API
1 parent 9a02115 commit e3524d1

File tree

11 files changed

+626
-269
lines changed

11 files changed

+626
-269
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub struct TestChannelMonitor {
8585
impl TestChannelMonitor {
8686
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<dyn chaininterface::BroadcasterInterface>, logger: Arc<dyn Logger>, feeest: Arc<dyn chaininterface::FeeEstimator>) -> Self {
8787
Self {
88-
simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest),
88+
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest)),
8989
update_ret: Mutex::new(Ok(())),
9090
latest_good_update: Mutex::new(HashMap::new()),
9191
latest_update_good: Mutex::new(HashMap::new()),
@@ -190,7 +190,7 @@ pub fn do_test(data: &[u8]) {
190190
config.channel_options.fee_proportional_millionths = 0;
191191
config.channel_options.announced_channel = true;
192192
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
193-
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
193+
(Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap()),
194194
monitor)
195195
} }
196196
}
@@ -221,14 +221,14 @@ pub fn do_test(data: &[u8]) {
221221
let read_args = ChannelManagerReadArgs {
222222
keys_manager,
223223
fee_estimator: fee_est.clone(),
224-
monitor: monitor.clone(),
224+
monitor: monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>,
225225
tx_broadcaster: broadcast.clone(),
226226
logger,
227227
default_config: config,
228228
channel_monitors: &mut monitor_refs,
229229
};
230230

231-
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
231+
let res = (<(Sha256d, ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
232232
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
233233
if !was_good {
234234
// If the last time we updated a monitor we didn't successfully update (and we

fuzz/src/full_stack.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ impl<'a> Hash for Peer<'a> {
136136
}
137137

138138
struct MoneyLossDetector<'a> {
139-
manager: Arc<ChannelManager<EnforcingChannelKeys>>,
139+
manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>,
140140
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
141-
handler: PeerManager<Peer<'a>>,
141+
handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>,
142142

143143
peers: &'a RefCell<[bool; 256]>,
144144
funding_txn: Vec<Transaction>,
@@ -149,7 +149,7 @@ struct MoneyLossDetector<'a> {
149149
blocks_connected: u32,
150150
}
151151
impl<'a> MoneyLossDetector<'a> {
152-
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
152+
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::ManyChannelMonitor>>>>) -> Self {
153153
MoneyLossDetector {
154154
manager,
155155
monitor,
@@ -320,14 +320,14 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
320320

321321
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
322322
let broadcast = Arc::new(TestBroadcaster{});
323-
let monitor = channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone());
323+
let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
324324

325325
let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
326326
let mut config = UserConfig::default();
327327
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
328328
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
329329
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
330-
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
330+
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone() as Arc<channelmonitor::ManyChannelMonitor>, broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap());
331331
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));
332332

333333
let peers = RefCell::new([false; 256]);

lightning-net-tokio/src/lib.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub struct Connection {
4242
id: u64,
4343
}
4444
impl Connection {
45-
fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
45+
fn schedule_read(peer_manager: SimpleArcPeerManager, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
4646
let us_ref = us.clone();
4747
let us_close_ref = us.clone();
4848
let peer_manager_ref = peer_manager.clone();
@@ -110,7 +110,7 @@ impl Connection {
110110
///
111111
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
112112
/// ChannelManager and ChannelMonitor objects.
113-
pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
113+
pub fn setup_inbound(peer_manager: SimpleArcPeerManager, event_notify: mpsc::Sender<()>, stream: TcpStream) {
114114
let (reader, us) = Self::new(event_notify, stream);
115115

116116
if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -124,7 +124,7 @@ impl Connection {
124124
///
125125
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
126126
/// ChannelManager and ChannelMonitor objects.
127-
pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
127+
pub fn setup_outbound(peer_manager: SimpleArcPeerManager, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
128128
let (reader, us) = Self::new(event_notify, stream);
129129

130130
if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
@@ -142,7 +142,7 @@ impl Connection {
142142
///
143143
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
144144
/// ChannelManager and ChannelMonitor objects.
145-
pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
145+
pub fn connect_outbound(peer_manager: SimpleArcPeerManager, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
146146
let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
147147
future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
148148
});
@@ -157,14 +157,20 @@ impl Connection {
157157
}
158158
}
159159

160-
#[derive(Clone)]
160+
/// SimpleArcPeerManager defines a type alias for the peer_hander's SimpleArcPeerManager, with
161+
/// a concrete type given to its Descriptor generic parameter. We need to use the
162+
/// peer_hander::SimpleArcPeerManager instead of the peer_hander::SimpleRefPeerManager because
163+
/// we're using this parameter in calls to tokio::spawn, which requires parameters to have
164+
/// static lifetimes. Aliasing this concrete type allows for more concise function definitions.
165+
pub type SimpleArcPeerManager = peer_handler::SimpleArcPeerManager<SocketDescriptor>;
166+
161167
pub struct SocketDescriptor {
162168
conn: Arc<Mutex<Connection>>,
163169
id: u64,
164-
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>,
170+
peer_manager: SimpleArcPeerManager,
165171
}
166172
impl SocketDescriptor {
167-
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>) -> Self {
173+
fn new(conn: Arc<Mutex<Connection>>, peer_manager: SimpleArcPeerManager) -> Self {
168174
let id = conn.lock().unwrap().id;
169175
Self { conn, id, peer_manager }
170176
}
@@ -256,6 +262,15 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
256262
us.read_paused = true;
257263
}
258264
}
265+
impl Clone for SocketDescriptor {
266+
fn clone(&self) -> Self {
267+
Self {
268+
conn: Arc::clone(&self.conn),
269+
id: self.id,
270+
peer_manager: Arc::clone(&self.peer_manager),
271+
}
272+
}
273+
}
259274
impl Eq for SocketDescriptor {}
260275
impl PartialEq for SocketDescriptor {
261276
fn eq(&self, o: &Self) -> bool {

lightning/src/chain/chaininterface.rs

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ use bitcoin::network::constants::Network;
1414

1515
use util::logger::Logger;
1616

17-
use std::sync::{Mutex,Weak,MutexGuard,Arc};
17+
use std::sync::{Mutex, MutexGuard, Arc};
1818
use std::sync::atomic::{AtomicUsize, Ordering};
1919
use std::collections::HashSet;
20+
use std::ops::Deref;
21+
use std::marker::PhantomData;
2022

2123
/// Used to give chain error details upstream
2224
pub enum ChainError {
@@ -205,26 +207,48 @@ impl ChainWatchedUtil {
205207
}
206208
}
207209

210+
/// BlockNotifierArc is useful when you need a BlockNotifier that points to ChainListeners with
211+
/// static lifetimes, e.g. when you're using lightning-net-tokio (since tokio::spawn requires
212+
/// parameters with static lifetimes). Other times you can afford a reference, which is more
213+
/// efficient, in which case BlockNotifierRef is a more appropriate type. Defining these type
214+
/// aliases prevents issues such as overly long function definitions.
215+
pub type BlockNotifierArc = Arc<BlockNotifier<'static, &'static ChainListener>>;
216+
217+
/// BlockNotifierRef is useful when you want a BlockNotifier that points to ChainListeners
218+
/// with nonstatic lifetimes. This is useful for when static lifetimes are not needed. Nonstatic
219+
/// lifetimes are more efficient but less flexible, and should be used by default unless static
220+
/// lifetimes are required, e.g. when you're using lightning-net-tokio (since tokio::spawn
221+
/// requires parameters with static lifetimes), in which case BlockNotifierArc is a more
222+
/// appropriate type. Defining these type aliases for common usages prevents issues such as
223+
/// overly long function definitions.
224+
pub type BlockNotifierRef<'a, 'b> = &'a BlockNotifier<'b, &'b ChainListener>;
225+
208226
/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
209227
/// data is registered.
210-
pub struct BlockNotifier {
211-
listeners: Mutex<Vec<Weak<ChainListener>>>, //TODO(vmw): try removing Weak
228+
///
229+
/// Rather than using a plain BlockNotifier, it is preferable to use either a BlockNotifierArc
230+
/// or a BlockNotifierRef for conciseness. See their documentation for more details, but essentially
231+
/// you should default to using a BlockNotifierRef, and use a BlockNotifierArc instead when you
232+
/// require ChainListeners with static lifetimes, such as when you're using lightning-net-tokio.
233+
pub struct BlockNotifier<'a, CL: Deref<Target = ChainListener + 'a> + 'a> {
234+
listeners: Mutex<Vec<CL>>,
212235
chain_monitor: Arc<ChainWatchInterface>,
236+
phantom: PhantomData<&'a ()>,
213237
}
214238

215-
impl BlockNotifier {
239+
impl<'a, CL: Deref<Target = ChainListener + 'a> + 'a> BlockNotifier<'a, CL> {
216240
/// Constructs a new BlockNotifier without any listeners.
217-
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier {
241+
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a, CL> {
218242
BlockNotifier {
219243
listeners: Mutex::new(Vec::new()),
220244
chain_monitor,
245+
phantom: PhantomData,
221246
}
222247
}
223248

224-
/// Register the given listener to receive events. Only a weak pointer is provided and
225-
/// the registration should be freed once that pointer expires.
249+
/// Register the given listener to receive events.
226250
// TODO: unregister
227-
pub fn register_listener(&self, listener: Weak<ChainListener>) {
251+
pub fn register_listener(&self, listener: CL) {
228252
let mut vec = self.listeners.lock().unwrap();
229253
vec.push(listener);
230254
}
@@ -250,25 +274,19 @@ impl BlockNotifier {
250274
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
251275
let last_seen = self.chain_monitor.reentered();
252276

253-
let listeners = self.listeners.lock().unwrap().clone();
277+
let listeners = self.listeners.lock().unwrap();
254278
for listener in listeners.iter() {
255-
match listener.upgrade() {
256-
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
257-
None => ()
258-
}
279+
listener.block_connected(header, height, txn_matched, indexes_of_txn_matched);
259280
}
260281
return last_seen != self.chain_monitor.reentered();
261282
}
262283

263284

264285
/// Notify listeners that a block was disconnected.
265286
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
266-
let listeners = self.listeners.lock().unwrap().clone();
287+
let listeners = self.listeners.lock().unwrap();
267288
for listener in listeners.iter() {
268-
match listener.upgrade() {
269-
Some(arc) => arc.block_disconnected(&header, disconnected_height),
270-
None => ()
271-
}
289+
listener.block_disconnected(&header, disconnected_height);
272290
}
273291
}
274292

0 commit comments

Comments
 (0)