@@ -173,6 +173,7 @@ uniffi::include_scaffolding!("ldk_node");
173
173
pub struct Node {
174
174
runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
175
175
stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
176
+ event_handling_stopped_sender : tokio:: sync:: watch:: Sender < ( ) > ,
176
177
config : Arc < Config > ,
177
178
wallet : Arc < Wallet > ,
178
179
tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -731,6 +732,7 @@ impl Node {
731
732
} ;
732
733
733
734
let background_stop_logger = Arc :: clone ( & self . logger ) ;
735
+ let event_handling_stopped_sender = self . event_handling_stopped_sender . clone ( ) ;
734
736
runtime. spawn ( async move {
735
737
process_events_async (
736
738
background_persister,
@@ -751,6 +753,18 @@ impl Node {
751
753
panic ! ( "Failed to process events" ) ;
752
754
} ) ;
753
755
log_trace ! ( background_stop_logger, "Events processing stopped." , ) ;
756
+
757
+ match event_handling_stopped_sender. send ( ( ) ) {
758
+ Ok ( _) => ( ) ,
759
+ Err ( e) => {
760
+ log_error ! (
761
+ background_stop_logger,
762
+ "Failed to send 'events handling stopped' signal. This should never happen: {}" ,
763
+ e
764
+ ) ;
765
+ debug_assert ! ( false ) ;
766
+ } ,
767
+ }
754
768
} ) ;
755
769
756
770
if let Some ( liquidity_source) = self . liquidity_source . as_ref ( ) {
@@ -800,9 +814,55 @@ impl Node {
800
814
} ,
801
815
}
802
816
803
- // Stop disconnect peers.
817
+ // Disconnect all peers.
804
818
self . peer_manager . disconnect_all_peers ( ) ;
805
819
820
+ // Wait until event handling stopped, at least until a timeout is reached.
821
+ let event_handling_stopped_logger = Arc :: clone ( & self . logger ) ;
822
+ let mut event_handling_stopped_receiver = self . event_handling_stopped_sender . subscribe ( ) ;
823
+
824
+ // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
825
+ // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
826
+ // should drop this considerably post upgrading to BDK 1.0.
827
+ let timeout_res = runtime. block_on ( async {
828
+ tokio:: time:: timeout (
829
+ Duration :: from_secs ( 100 ) ,
830
+ event_handling_stopped_receiver. changed ( ) ,
831
+ )
832
+ . await
833
+ } ) ;
834
+
835
+ match timeout_res {
836
+ Ok ( stop_res) => match stop_res {
837
+ Ok ( ( ) ) => { } ,
838
+ Err ( e) => {
839
+ log_error ! (
840
+ event_handling_stopped_logger,
841
+ "Stopping event handling failed. This should never happen: {}" ,
842
+ e
843
+ ) ;
844
+ panic ! ( "Stopping event handling failed. This should never happen." ) ;
845
+ } ,
846
+ } ,
847
+ Err ( e) => {
848
+ log_error ! (
849
+ event_handling_stopped_logger,
850
+ "Stopping event handling timed out: {}" ,
851
+ e
852
+ ) ;
853
+ } ,
854
+ }
855
+
856
+ #[ cfg( tokio_unstable) ]
857
+ {
858
+ log_trace ! (
859
+ self . logger,
860
+ "Active runtime tasks left prior to shutdown: {}" ,
861
+ runtime. metrics( ) . active_tasks_count( )
862
+ ) ;
863
+ }
864
+
865
+ // Shutdown our runtime. By now ~no or only very few tasks should be left.
806
866
runtime. shutdown_timeout ( Duration :: from_secs ( 10 ) ) ;
807
867
808
868
log_info ! ( self . logger, "Shutdown complete." ) ;
0 commit comments