@@ -273,9 +273,9 @@ macro_rules! define_run_body {
273
273
(
274
274
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
275
275
$channel_manager: ident, $process_channel_manager_events: expr,
276
- $peer_manager: ident, $gossip_sync : ident , $logger : ident , $scorer : ident,
277
- $loop_exit_check : expr , $await : expr, $get_timer : expr, $timer_elapsed : expr,
278
- $check_slow_await: expr
276
+ $peer_manager: ident, $process_onion_message_handler_events : expr , $gossip_sync : ident,
277
+ $logger : ident , $scorer : ident , $loop_exit_check : expr, $await : expr, $get_timer : expr,
278
+ $timer_elapsed : expr , $ check_slow_await: expr
279
279
) => { {
280
280
log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
281
281
$channel_manager. timer_tick_occurred( ) ;
@@ -292,6 +292,7 @@ macro_rules! define_run_body {
292
292
loop {
293
293
$process_channel_manager_events;
294
294
$process_chain_monitor_events;
295
+ $process_onion_message_handler_events;
295
296
296
297
// Note that the PeerManager::process_events may block on ChannelManager's locks,
297
298
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -655,7 +656,8 @@ where
655
656
persister, chain_monitor,
656
657
chain_monitor. process_pending_events_async( async_event_handler) . await ,
657
658
channel_manager, channel_manager. process_pending_events_async( async_event_handler) . await ,
658
- peer_manager, gossip_sync, logger, scorer, should_break, {
659
+ peer_manager, process_onion_message_handler_events_async( & peer_manager, async_event_handler) . await ,
660
+ gossip_sync, logger, scorer, should_break, {
659
661
let fut = Selector {
660
662
a: channel_manager. get_event_or_persistence_needed_future( ) ,
661
663
b: chain_monitor. get_update_future( ) ,
@@ -679,6 +681,27 @@ where
679
681
)
680
682
}
681
683
684
+ #[ cfg( feature = "futures" ) ]
685
+ async fn process_onion_message_handler_events_async <
686
+ EventHandlerFuture : core:: future:: Future < Output = ( ) > ,
687
+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
688
+ PM : ' static + Deref + Send + Sync ,
689
+ > (
690
+ peer_manager : & PM , handler : EventHandler
691
+ )
692
+ where
693
+ PM :: Target : APeerManager + Send + Sync ,
694
+ {
695
+ use lightning:: events:: EventsProvider ;
696
+
697
+ let events = core:: cell:: RefCell :: new ( Vec :: new ( ) ) ;
698
+ peer_manager. onion_message_handler ( ) . process_pending_events ( & |e| events. borrow_mut ( ) . push ( e) ) ;
699
+
700
+ for event in events. into_inner ( ) {
701
+ handler ( event) . await
702
+ }
703
+ }
704
+
682
705
#[ cfg( feature = "std" ) ]
683
706
impl BackgroundProcessor {
684
707
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
@@ -788,7 +811,9 @@ impl BackgroundProcessor {
788
811
define_run_body ! (
789
812
persister, chain_monitor, chain_monitor. process_pending_events( & event_handler) ,
790
813
channel_manager, channel_manager. process_pending_events( & event_handler) ,
791
- peer_manager, gossip_sync, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
814
+ peer_manager,
815
+ peer_manager. onion_message_handler( ) . process_pending_events( & event_handler) ,
816
+ gossip_sync, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
792
817
{ Sleeper :: from_two_futures(
793
818
channel_manager. get_event_or_persistence_needed_future( ) ,
794
819
chain_monitor. get_update_future( )
0 commit comments