Skip to content

Fix a leak in FutureState when a Notifier is dropped un-woken #2233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 63 additions & 17 deletions lightning/src/util/wakers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Notifier {
pub(crate) fn notify(&self) {
let mut lock = self.notify_pending.lock().unwrap();
if let Some(future_state) = &lock.1 {
if future_state.lock().unwrap().complete() {
if complete_future(future_state) {
lock.1 = None;
return;
}
Expand All @@ -69,6 +69,7 @@ impl Notifier {
} else {
let state = Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
callbacks_with_state: Vec::new(),
complete: lock.0,
callbacks_made: false,
}));
Expand Down Expand Up @@ -112,19 +113,24 @@ pub(crate) struct FutureState {
// first bool - set to false if we're just calling a Waker, and true if we're calling an actual
// user-provided function.
callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
callbacks_with_state: Vec<(bool, Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>)>,
complete: bool,
callbacks_made: bool,
}

impl FutureState {
fn complete(&mut self) -> bool {
for (counts_as_call, callback) in self.callbacks.drain(..) {
callback.call();
self.callbacks_made |= counts_as_call;
}
self.complete = true;
self.callbacks_made
fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
let mut state_lock = this.lock().unwrap();
let state = &mut *state_lock;
for (counts_as_call, callback) in state.callbacks.drain(..) {
callback.call();
state.callbacks_made |= counts_as_call;
}
for (counts_as_call, callback) in state.callbacks_with_state.drain(..) {
(callback)(this);
state.callbacks_made |= counts_as_call;
}
state.complete = true;
state.callbacks_made
}

/// A simple future which can complete once, and calls some callback(s) when it does so.
Expand Down Expand Up @@ -240,14 +246,13 @@ impl Sleeper {
for notifier_mtx in self.notifiers.iter() {
let cv_ref = Arc::clone(&cv);
let notified_fut_ref = Arc::clone(&notified_fut_mtx);
let notifier_ref = Arc::clone(&notifier_mtx);
let mut notifier = notifier_mtx.lock().unwrap();
if notifier.complete {
*notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
*notified_fut_mtx.lock().unwrap() = Some(Arc::clone(&notifier_mtx));
break;
}
notifier.callbacks.push((false, Box::new(move || {
*notified_fut_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
notifier.callbacks_with_state.push((false, Box::new(move |notifier_ref| {
*notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
cv_ref.notify_all();
})));
}
Expand Down Expand Up @@ -407,11 +412,50 @@ mod tests {
}
}

#[cfg(feature = "std")]
#[test]
fn test_state_drops() {
// Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
// but after having been slept-on. This tests for that leak.
use crate::sync::Arc;
use std::thread;

let notifier_a = Arc::new(Notifier::new());
let notifier_b = Arc::new(Notifier::new());

let thread_notifier_a = Arc::clone(&notifier_a);

let future_a = notifier_a.get_future();
let future_state_a = Arc::downgrade(&future_a.state);

let future_b = notifier_b.get_future();
let future_state_b = Arc::downgrade(&future_b.state);

let join_handle = thread::spawn(move || {
// Let the other thread get to the wait point, then notify it.
std::thread::sleep(Duration::from_millis(50));
thread_notifier_a.notify();
});

// Wait on the other thread to finish its sleep, note that the leak only happened if we
// actually have to sleep here, not if we immediately return.
Sleeper::from_two_futures(future_a, future_b).wait();

join_handle.join().unwrap();

// then drop the notifiers and make sure the future states are gone.
mem::drop(notifier_a);
mem::drop(notifier_b);

assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
}

#[test]
fn test_future_callbacks() {
let future = Future {
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
}))
Expand All @@ -421,21 +465,22 @@ mod tests {
future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));

assert!(!callback.load(Ordering::SeqCst));
future.state.lock().unwrap().complete();
complete_future(&future.state);
assert!(callback.load(Ordering::SeqCst));
future.state.lock().unwrap().complete();
complete_future(&future.state);
}

#[test]
fn test_pre_completed_future_callbacks() {
let future = Future {
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
}))
};
future.state.lock().unwrap().complete();
complete_future(&future.state);

let callback = Arc::new(AtomicBool::new(false));
let callback_ref = Arc::clone(&callback);
Expand Down Expand Up @@ -469,6 +514,7 @@ mod tests {
let mut future = Future {
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
}))
Expand All @@ -483,7 +529,7 @@ mod tests {
assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
assert!(!second_woken.load(Ordering::SeqCst));

future.state.lock().unwrap().complete();
complete_future(&future.state);
assert!(woken.load(Ordering::SeqCst));
assert!(second_woken.load(Ordering::SeqCst));
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
Expand Down