Skip to content

Commit a469bc4

Browse files
committed
Shutdown: Wait for event processing to fully stop
.. before initiating the Runtime shutdown.
1 parent 9663fe2 commit a469bc4

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,7 @@ fn build_with_store_internal(
959959
};
960960

961961
let (stop_sender, _) = tokio::sync::watch::channel(());
962+
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
962963

963964
let is_listening = Arc::new(AtomicBool::new(false));
964965
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
@@ -971,6 +972,7 @@ fn build_with_store_internal(
971972
Ok(Node {
972973
runtime,
973974
stop_sender,
975+
event_handling_stopped_sender,
974976
config,
975977
wallet,
976978
tx_sync,

src/lib.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ uniffi::include_scaffolding!("ldk_node");
173173
pub struct Node {
174174
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
175175
stop_sender: tokio::sync::watch::Sender<()>,
176+
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
176177
config: Arc<Config>,
177178
wallet: Arc<Wallet>,
178179
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -731,6 +732,7 @@ impl Node {
731732
};
732733

733734
let background_stop_logger = Arc::clone(&self.logger);
735+
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
734736
runtime.spawn(async move {
735737
process_events_async(
736738
background_persister,
@@ -751,6 +753,18 @@ impl Node {
751753
panic!("Failed to process events");
752754
});
753755
log_trace!(background_stop_logger, "Events processing stopped.",);
756+
757+
match event_handling_stopped_sender.send(()) {
758+
Ok(_) => (),
759+
Err(e) => {
760+
log_error!(
761+
background_stop_logger,
762+
"Failed to send 'events handling stopped' signal. This should never happen: {}",
763+
e
764+
);
765+
debug_assert!(false);
766+
},
767+
}
754768
});
755769

756770
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
@@ -800,9 +814,55 @@ impl Node {
800814
},
801815
}
802816

803-
// Stop disconnect peers.
817+
// Disconnect all peers.
804818
self.peer_manager.disconnect_all_peers();
805819

820+
// Wait until event handling stopped, at least until a timeout is reached.
821+
let event_handling_stopped_logger = Arc::clone(&self.logger);
822+
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
823+
824+
// FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
825+
// event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
826+
// should drop this considerably post upgrading to BDK 1.0.
827+
let timeout_res = runtime.block_on(async {
828+
tokio::time::timeout(
829+
Duration::from_secs(100),
830+
event_handling_stopped_receiver.changed(),
831+
)
832+
.await
833+
});
834+
835+
match timeout_res {
836+
Ok(stop_res) => match stop_res {
837+
Ok(()) => {},
838+
Err(e) => {
839+
log_error!(
840+
event_handling_stopped_logger,
841+
"Stopping event handling failed. This should never happen: {}",
842+
e
843+
);
844+
panic!("Stopping event handling failed. This should never happen.");
845+
},
846+
},
847+
Err(e) => {
848+
log_error!(
849+
event_handling_stopped_logger,
850+
"Stopping event handling timed out: {}",
851+
e
852+
);
853+
},
854+
}
855+
856+
#[cfg(tokio_unstable)]
857+
{
858+
log_trace!(
859+
self.logger,
860+
"Active runtime tasks left prior to shutdown: {}",
861+
runtime.metrics().active_tasks_count()
862+
);
863+
}
864+
865+
// Shutdown our runtime. By now ~no or only very few tasks should be left.
806866
runtime.shutdown_timeout(Duration::from_secs(10));
807867

808868
log_info!(self.logger, "Shutdown complete.");

0 commit comments

Comments
 (0)