Skip to content

Commit b5b57f1

Browse files
committed
Hold sep. Mutexes for pending intercepted_msgs/peer_connected events
This is a minor refactor that will allow us to access the individual event queue Mutexes separately, allowing us to drop the locks earlier when processing them individually.
1 parent 0cfe55c commit b5b57f1

File tree

1 file changed

+25
-24
lines changed

1 file changed

+25
-24
lines changed

lightning/src/onion_message/messenger.rs

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -261,12 +261,8 @@ pub struct OnionMessenger<
261261
async_payments_handler: APH,
262262
custom_handler: CMH,
263263
intercept_messages_for_offline_peers: bool,
264-
pending_events: Mutex<PendingEvents>,
265-
}
266-
267-
struct PendingEvents {
268-
intercepted_msgs: Vec<Event>,
269-
peer_connecteds: Vec<Event>,
264+
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
265+
pending_peer_connected_events: Mutex<Vec<Event>>,
270266
}
271267

272268
/// [`OnionMessage`]s buffered to be sent.
@@ -1095,10 +1091,8 @@ where
10951091
async_payments_handler,
10961092
custom_handler,
10971093
intercept_messages_for_offline_peers,
1098-
pending_events: Mutex::new(PendingEvents {
1099-
intercepted_msgs: Vec::new(),
1100-
peer_connecteds: Vec::new(),
1101-
}),
1094+
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
1095+
pending_peer_connected_events: Mutex::new(Vec::new()),
11021096
}
11031097
}
11041098

@@ -1316,14 +1310,15 @@ where
13161310

13171311
fn enqueue_intercepted_event(&self, event: Event) {
13181312
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
1319-
let mut pending_events = self.pending_events.lock().unwrap();
1320-
let total_buffered_bytes: usize =
1321-
pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
1313+
let mut pending_intercepted_msgs_events =
1314+
self.pending_intercepted_msgs_events.lock().unwrap();
1315+
let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter()
1316+
.map(|ev| ev.serialized_length()).sum();
13221317
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
13231318
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
13241319
return
13251320
}
1326-
pending_events.intercepted_msgs.push(event);
1321+
pending_intercepted_msgs_events.push(event);
13271322
}
13281323

13291324
/// Processes any events asynchronously using the given handler.
@@ -1339,9 +1334,12 @@ where
13391334
let mut intercepted_msgs = Vec::new();
13401335
let mut peer_connecteds = Vec::new();
13411336
{
1342-
let mut pending_events = self.pending_events.lock().unwrap();
1343-
core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
1344-
core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
1337+
let mut pending_intercepted_msgs_events =
1338+
self.pending_intercepted_msgs_events.lock().unwrap();
1339+
let mut pending_peer_connected_events =
1340+
self.pending_peer_connected_events.lock().unwrap();
1341+
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
1342+
core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
13451343
}
13461344

13471345
let mut futures = Vec::with_capacity(intercepted_msgs.len());
@@ -1417,18 +1415,19 @@ where
14171415
}
14181416
let mut events = Vec::new();
14191417
{
1420-
let mut pending_events = self.pending_events.lock().unwrap();
1418+
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1419+
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
14211420
#[cfg(debug_assertions)] {
1422-
for ev in pending_events.intercepted_msgs.iter() {
1421+
for ev in pending_intercepted_msgs_events.iter() {
14231422
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
14241423
}
1425-
for ev in pending_events.peer_connecteds.iter() {
1424+
for ev in pending_peer_connected_events.iter() {
14261425
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
14271426
}
14281427
}
1429-
core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
1430-
events.append(&mut pending_events.peer_connecteds);
1431-
pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
1428+
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
1429+
events.append(&mut pending_peer_connected_events);
1430+
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
14321431
}
14331432
for ev in events {
14341433
handler.handle_event(ev);
@@ -1558,7 +1557,9 @@ where
15581557
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
15591558
.mark_connected();
15601559
if self.intercept_messages_for_offline_peers {
1561-
self.pending_events.lock().unwrap().peer_connecteds.push(
1560+
let mut pending_peer_connected_events =
1561+
self.pending_peer_connected_events.lock().unwrap();
1562+
pending_peer_connected_events.push(
15621563
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
15631564
);
15641565
}

0 commit comments

Comments
 (0)