Skip to content

Commit 004bbd0

Browse files
committed
Move channel monitor updates inside the channel_state lock
This really, really sucks as it defeats almost all of the cross-channel parallelism we'd intended to have - waiting on a client to update a watchtower for an unrelated channel to process any messages is really shitty. We should revisit this with per-channel locks as a compile-time option post-0.1.
1 parent d07dd39 commit 004bbd0

File tree

1 file changed

+69
-68
lines changed

1 file changed

+69
-68
lines changed

src/ln/channelmanager.rs

Lines changed: 69 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ impl ChannelManager {
10481048
let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?;
10491049
let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash);
10501050

1051-
let (first_hop_node_id, (update_add, commitment_signed, chan_monitor)) = {
1051+
let (first_hop_node_id, update_add, commitment_signed) = {
10521052
let mut channel_state_lock = self.channel_state.lock().unwrap();
10531053
let channel_state = channel_state_lock.borrow_parts();
10541054

@@ -1065,24 +1065,30 @@ impl ChannelManager {
10651065
if !chan.is_live() {
10661066
return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"});
10671067
}
1068-
chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
1068+
match chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
10691069
route: route.clone(),
10701070
session_priv: session_priv.clone(),
1071-
}, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})?
1071+
}, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? {
1072+
Some((update_add, commitment_signed, chan_monitor)) => {
1073+
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1074+
unimplemented!();
1075+
}
1076+
Some((update_add, commitment_signed))
1077+
},
1078+
None => None,
1079+
}
10721080
};
10731081

10741082
let first_hop_node_id = route.hops.first().unwrap().pubkey;
10751083

10761084
match res {
1077-
Some(msgs) => (first_hop_node_id, msgs),
1085+
Some((update_add, commitment_signed)) => {
1086+
(first_hop_node_id, update_add, commitment_signed)
1087+
},
10781088
None => return Ok(()),
10791089
}
10801090
};
10811091

1082-
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1083-
unimplemented!();
1084-
}
1085-
10861092
let mut events = self.pending_events.lock().unwrap();
10871093
events.push(events::Event::UpdateHTLCs {
10881094
node_id: first_hop_node_id,
@@ -1135,7 +1141,9 @@ impl ChannelManager {
11351141
},
11361142
None => return
11371143
}
1138-
}; // Release channel lock for install_watch_outpoint call,
1144+
};
1145+
// Because we have exclusive ownership of the channel here we can release the channel_state
1146+
// lock before add_update_monitor
11391147
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
11401148
unimplemented!();
11411149
}
@@ -1250,7 +1258,10 @@ impl ChannelManager {
12501258
continue;
12511259
},
12521260
};
1253-
new_events.push((Some(monitor), events::Event::UpdateHTLCs {
1261+
if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
1262+
unimplemented!();// but def dont push the event...
1263+
}
1264+
new_events.push(events::Event::UpdateHTLCs {
12541265
node_id: forward_chan.get_their_node_id(),
12551266
updates: msgs::CommitmentUpdate {
12561267
update_add_htlcs: add_htlc_msgs,
@@ -1260,7 +1271,7 @@ impl ChannelManager {
12601271
update_fee: None,
12611272
commitment_signed: commitment_msg,
12621273
},
1263-
}));
1274+
});
12641275
}
12651276
} else {
12661277
for HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info } in pending_forwards.drain(..) {
@@ -1273,10 +1284,10 @@ impl ChannelManager {
12731284
hash_map::Entry::Occupied(mut entry) => entry.get_mut().push(prev_hop_data),
12741285
hash_map::Entry::Vacant(entry) => { entry.insert(vec![prev_hop_data]); },
12751286
};
1276-
new_events.push((None, events::Event::PaymentReceived {
1287+
new_events.push(events::Event::PaymentReceived {
12771288
payment_hash: forward_info.payment_hash,
12781289
amt: forward_info.amt_to_forward,
1279-
}));
1290+
});
12801291
}
12811292
}
12821293
}
@@ -1290,21 +1301,8 @@ impl ChannelManager {
12901301
}
12911302

12921303
if new_events.is_empty() { return }
1293-
1294-
new_events.retain(|event| {
1295-
if let &Some(ref monitor) = &event.0 {
1296-
if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor.clone()) {
1297-
unimplemented!();// but def dont push the event...
1298-
}
1299-
}
1300-
true
1301-
});
1302-
13031304
let mut events = self.pending_events.lock().unwrap();
1304-
events.reserve(new_events.len());
1305-
for event in new_events.drain(..) {
1306-
events.push(event.1);
1307-
}
1305+
events.append(&mut new_events);
13081306
}
13091307

13101308
/// Indicates that the preimage for payment_hash is unknown after a PaymentReceived event.
@@ -1355,7 +1353,13 @@ impl ChannelManager {
13551353

13561354
let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
13571355
match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) {
1358-
Ok(msg) => (chan.get_their_node_id(), msg),
1356+
Ok(Some((msg, commitment_msg, chan_monitor))) => {
1357+
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1358+
unimplemented!();
1359+
}
1360+
(chan.get_their_node_id(), Some((msg, commitment_msg)))
1361+
},
1362+
Ok(None) => (chan.get_their_node_id(), None),
13591363
Err(_e) => {
13601364
//TODO: Do something with e?
13611365
return;
@@ -1364,13 +1368,9 @@ impl ChannelManager {
13641368
};
13651369

13661370
match fail_msgs {
1367-
Some((msg, commitment_msg, chan_monitor)) => {
1371+
Some((msg, commitment_msg)) => {
13681372
mem::drop(channel_state);
13691373

1370-
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1371-
unimplemented!();// but def dont push the event...
1372-
}
1373-
13741374
let mut pending_events = self.pending_events.lock().unwrap();
13751375
pending_events.push(events::Event::UpdateHTLCs {
13761376
node_id,
@@ -1435,7 +1435,13 @@ impl ChannelManager {
14351435

14361436
let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
14371437
match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
1438-
Ok(msg) => (chan.get_their_node_id(), msg),
1438+
Ok((msgs, Some(chan_monitor))) => {
1439+
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1440+
unimplemented!();// but def dont push the event...
1441+
}
1442+
(chan.get_their_node_id(), msgs)
1443+
},
1444+
Ok((msgs, None)) => (chan.get_their_node_id(), msgs),
14391445
Err(_e) => {
14401446
// TODO: There is probably a channel manager somewhere that needs to
14411447
// learn the preimage as the channel may be about to hit the chain.
@@ -1446,13 +1452,7 @@ impl ChannelManager {
14461452
};
14471453

14481454
mem::drop(channel_state);
1449-
if let Some(chan_monitor) = fulfill_msgs.1 {
1450-
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1451-
unimplemented!();// but def dont push the event...
1452-
}
1453-
}
1454-
1455-
if let Some((msg, commitment_msg)) = fulfill_msgs.0 {
1455+
if let Some((msg, commitment_msg)) = fulfill_msgs {
14561456
let mut pending_events = self.pending_events.lock().unwrap();
14571457
pending_events.push(events::Event::UpdateHTLCs {
14581458
node_id: node_id,
@@ -1565,10 +1565,9 @@ impl ChannelManager {
15651565
},
15661566
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.temporary_channel_id))
15671567
}
1568-
}; // Release channel lock for install_watch_outpoint call,
1569-
// note that this means if the remote end is misbehaving and sends a message for the same
1570-
// channel back-to-back with funding_created, we'll end up thinking they sent a message
1571-
// for a bogus channel.
1568+
};
1569+
// Because we have exclusive ownership of the channel here we can release the channel_state
1570+
// lock before add_update_monitor
15721571
if let Err(_e) = self.monitor.add_update_monitor(monitor_update.get_funding_txo().unwrap(), monitor_update) {
15731572
unimplemented!();
15741573
}
@@ -1585,7 +1584,7 @@ impl ChannelManager {
15851584
}
15861585

15871586
fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
1588-
let (funding_txo, user_id, monitor) = {
1587+
let (funding_txo, user_id) = {
15891588
let mut channel_state = self.channel_state.lock().unwrap();
15901589
match channel_state.by_id.get_mut(&msg.channel_id) {
15911590
Some(chan) => {
@@ -1594,14 +1593,14 @@ impl ChannelManager {
15941593
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
15951594
}
15961595
let chan_monitor = chan.funding_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1597-
(chan.get_funding_txo().unwrap(), chan.get_user_id(), chan_monitor)
1596+
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1597+
unimplemented!();
1598+
}
1599+
(chan.get_funding_txo().unwrap(), chan.get_user_id())
15981600
},
15991601
None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
16001602
}
16011603
};
1602-
if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
1603-
unimplemented!();
1604-
}
16051604
let mut pending_events = self.pending_events.lock().unwrap();
16061605
pending_events.push(events::Event::FundingBroadcastSafe {
16071606
funding_txo: funding_txo,
@@ -1841,43 +1840,44 @@ impl ChannelManager {
18411840
}
18421841

18431842
fn internal_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>), MsgHandleErrInternal> {
1844-
let (revoke_and_ack, commitment_signed, chan_monitor) = {
1843+
let (revoke_and_ack, commitment_signed) = {
18451844
let mut channel_state = self.channel_state.lock().unwrap();
18461845
match channel_state.by_id.get_mut(&msg.channel_id) {
18471846
Some(chan) => {
18481847
if chan.get_their_node_id() != *their_node_id {
18491848
//TODO: here and below MsgHandleErrInternal, #153 case
18501849
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
18511850
}
1852-
chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?
1851+
let (revoke_and_ack, commitment_signed, chan_monitor) = chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1852+
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1853+
unimplemented!();
1854+
}
1855+
(revoke_and_ack, commitment_signed)
18531856
},
18541857
None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
18551858
}
18561859
};
1857-
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1858-
unimplemented!();
1859-
}
1860-
18611860
Ok((revoke_and_ack, commitment_signed))
18621861
}
18631862

18641863
fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<Option<msgs::CommitmentUpdate>, MsgHandleErrInternal> {
1865-
let ((res, mut pending_forwards, mut pending_failures, chan_monitor), short_channel_id) = {
1864+
let ((res, mut pending_forwards, mut pending_failures), short_channel_id) = {
18661865
let mut channel_state = self.channel_state.lock().unwrap();
18671866
match channel_state.by_id.get_mut(&msg.channel_id) {
18681867
Some(chan) => {
18691868
if chan.get_their_node_id() != *their_node_id {
18701869
//TODO: here and below MsgHandleErrInternal, #153 case
18711870
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
18721871
}
1873-
(chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?, chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
1872+
let (res, pending_forwards, pending_failures, chan_monitor) = chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
1873+
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1874+
unimplemented!();
1875+
}
1876+
((res, pending_forwards, pending_failures), chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
18741877
},
18751878
None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
18761879
}
18771880
};
1878-
if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
1879-
unimplemented!();
1880-
}
18811881
for failure in pending_failures.drain(..) {
18821882
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
18831883
}
@@ -1968,7 +1968,7 @@ impl ChannelManager {
19681968
}
19691969

19701970
fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, RAACommitmentOrder), MsgHandleErrInternal> {
1971-
let (res, chan_monitor) = {
1971+
let res = {
19721972
let mut channel_state = self.channel_state.lock().unwrap();
19731973
match channel_state.by_id.get_mut(&msg.channel_id) {
19741974
Some(chan) => {
@@ -1977,16 +1977,17 @@ impl ChannelManager {
19771977
}
19781978
let (funding_locked, revoke_and_ack, commitment_update, channel_monitor, order) = chan.channel_reestablish(msg)
19791979
.map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?;
1980-
(Ok((funding_locked, revoke_and_ack, commitment_update, order)), channel_monitor)
1980+
if let Some(monitor) = channel_monitor {
1981+
if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
1982+
unimplemented!();
1983+
}
1984+
}
1985+
Ok((funding_locked, revoke_and_ack, commitment_update, order))
19811986
},
19821987
None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
19831988
}
19841989
};
1985-
if let Some(monitor) = chan_monitor {
1986-
if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
1987-
unimplemented!();
1988-
}
1989-
}
1990+
19901991
res
19911992
}
19921993

0 commit comments

Comments
 (0)