Skip to content

Commit d9378e4

Browse files
committed
Shutdown: Wait for event processing to fully stop
.. before initiating the Runtime shutdown.
1 parent 6f6262e commit d9378e4

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,7 @@ fn build_with_store_internal(
959959
};
960960

961961
let (stop_sender, _) = tokio::sync::watch::channel(());
962+
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
962963

963964
let is_listening = Arc::new(AtomicBool::new(false));
964965
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
@@ -971,6 +972,7 @@ fn build_with_store_internal(
971972
Ok(Node {
972973
runtime,
973974
stop_sender,
975+
event_handling_stopped_sender,
974976
config,
975977
wallet,
976978
tx_sync,

src/lib.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ uniffi::include_scaffolding!("ldk_node");
173173
pub struct Node {
174174
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
175175
stop_sender: tokio::sync::watch::Sender<()>,
176+
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
176177
config: Arc<Config>,
177178
wallet: Arc<Wallet>,
178179
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -726,6 +727,7 @@ impl Node {
726727
};
727728

728729
let background_stop_logger = Arc::clone(&self.logger);
730+
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
729731
runtime.spawn(async move {
730732
process_events_async(
731733
background_persister,
@@ -746,6 +748,18 @@ impl Node {
746748
panic!("Failed to process events");
747749
});
748750
log_trace!(background_stop_logger, "Events processing stopped.",);
751+
752+
match event_handling_stopped_sender.send(()) {
753+
Ok(_) => (),
754+
Err(e) => {
755+
log_error!(
756+
background_stop_logger,
757+
"Failed to send 'events handling stopped' signal. This should never happen: {}",
758+
e
759+
);
760+
debug_assert!(false);
761+
},
762+
}
749763
});
750764

751765
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
@@ -795,9 +809,55 @@ impl Node {
795809
},
796810
}
797811

798-
// Stop disconnect peers.
812+
// Disconnect all peers.
799813
self.peer_manager.disconnect_all_peers();
800814

815+
// Wait until event handling stopped, at least until a timeout is reached.
816+
let event_handling_stopped_logger = Arc::clone(&self.logger);
817+
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
818+
819+
// FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
820+
// event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
821+
// should drop this considerably post upgrading to BDK 1.0.
822+
let timeout_res = runtime.block_on(async {
823+
tokio::time::timeout(
824+
Duration::from_secs(100),
825+
event_handling_stopped_receiver.changed(),
826+
)
827+
.await
828+
});
829+
830+
match timeout_res {
831+
Ok(stop_res) => match stop_res {
832+
Ok(()) => {},
833+
Err(e) => {
834+
log_error!(
835+
event_handling_stopped_logger,
836+
"Stopping event handling failed. This should never happen: {}",
837+
e
838+
);
839+
panic!("Stopping event handling failed. This should never happen.");
840+
},
841+
},
842+
Err(e) => {
843+
log_error!(
844+
event_handling_stopped_logger,
845+
"Stopping event handling timed out: {}",
846+
e
847+
);
848+
},
849+
}
850+
851+
#[cfg(tokio_unstable)]
852+
{
853+
log_trace!(
854+
self.logger,
855+
"Active runtime tasks left prior to shutdown: {}",
856+
runtime.metrics().active_tasks_count()
857+
);
858+
}
859+
860+
// Shutdown our runtime. By now ~no or only very few tasks should be left.
801861
runtime.shutdown_timeout(Duration::from_secs(10));
802862

803863
log_info!(self.logger, "Shutdown complete.");

0 commit comments

Comments
 (0)