Skip to content
This repository was archived by the owner on Jan 6, 2025. It is now read-only.

Fix more potential deadlock scenarios #125

Merged
merged 7 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 106 additions & 84 deletions src/lsps1/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,21 @@ where
pub fn send_get_info_request(&self, counterparty_node_id: PublicKey, user_channel_id: u128) {
let channel = InboundCRChannel::new(user_channel_id);

let mut outer_state_lock = self.per_peer_state.write().unwrap();
let inner_state_lock = outer_state_lock
.entry(counterparty_node_id)
.or_insert(Mutex::new(PeerState::default()));
let mut peer_state_lock = inner_state_lock.lock().unwrap();
peer_state_lock.insert_inbound_channel(user_channel_id, channel);

let request_id = crate::utils::generate_request_id(&self.entropy_source);
peer_state_lock.insert_request(request_id.clone(), user_channel_id);
{
let mut outer_state_lock = self.per_peer_state.write().unwrap();
let inner_state_lock = outer_state_lock
.entry(counterparty_node_id)
.or_insert(Mutex::new(PeerState::default()));
let mut peer_state_lock = inner_state_lock.lock().unwrap();
peer_state_lock.insert_inbound_channel(user_channel_id, channel);

peer_state_lock.insert_request(request_id.clone(), user_channel_id);
}

self.pending_messages.enqueue(
&counterparty_node_id,
LSPS1Message::Request(request_id, LSPS1Request::GetInfo(GetInfoRequest {})).into(),
);
let request = LSPS1Request::GetInfo(GetInfoRequest {});
let msg = LSPS1Message::Request(request_id, request).into();
self.pending_messages.enqueue(&counterparty_node_id, msg);
}

fn handle_get_info_response(
Expand Down Expand Up @@ -360,46 +361,55 @@ where
pub fn place_order(
&self, user_channel_id: u128, counterparty_node_id: &PublicKey, order: OrderParams,
) -> Result<(), APIError> {
let outer_state_lock = self.per_peer_state.write().unwrap();

match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();

let inbound_channel = peer_state_lock
.inbound_channels_by_id
.get_mut(&user_channel_id)
.ok_or(APIError::APIMisuseError {
err: format!("Channel with user_channel_id {} not found", user_channel_id),
})?;

match inbound_channel.order_requested(order.clone()) {
Ok(()) => (),
Err(e) => {
peer_state_lock.remove_inbound_channel(user_channel_id);
return Err(APIError::APIMisuseError { err: e.err });
},
};
let (result, request_msg) = {
let outer_state_lock = self.per_peer_state.write().unwrap();

match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();

let inbound_channel = peer_state_lock
.inbound_channels_by_id
.get_mut(&user_channel_id)
.ok_or(APIError::APIMisuseError {
err: format!(
"Channel with user_channel_id {} not found",
user_channel_id
),
})?;

match inbound_channel.order_requested(order.clone()) {
Ok(()) => (),
Err(e) => {
peer_state_lock.remove_inbound_channel(user_channel_id);
return Err(APIError::APIMisuseError { err: e.err });
},
};

let request_id = crate::utils::generate_request_id(&self.entropy_source);
peer_state_lock.insert_request(request_id.clone(), user_channel_id);
let request_id = crate::utils::generate_request_id(&self.entropy_source);
let request = LSPS1Request::CreateOrder(CreateOrderRequest { order });
let msg = LSPS1Message::Request(request_id.clone(), request).into();
peer_state_lock.insert_request(request_id, user_channel_id);

self.pending_messages.enqueue(
counterparty_node_id,
LSPS1Message::Request(
request_id,
LSPS1Request::CreateOrder(CreateOrderRequest { order }),
)
.into(),
);
},
None => {
return Err(APIError::APIMisuseError {
err: format!("No existing state with counterparty {}", counterparty_node_id),
})
},
(Ok(()), Some(msg))
},
None => (
Err(APIError::APIMisuseError {
err: format!(
"No existing state with counterparty {}",
counterparty_node_id
),
}),
None,
),
}
};

if let Some(msg) = request_msg {
self.pending_messages.enqueue(&counterparty_node_id, msg);
}
Ok(())

result
}

fn handle_create_order_response(
Expand Down Expand Up @@ -518,44 +528,56 @@ where
pub fn check_order_status(
&self, counterparty_node_id: &PublicKey, order_id: OrderId, user_channel_id: u128,
) -> Result<(), APIError> {
let outer_state_lock = self.per_peer_state.write().unwrap();
match outer_state_lock.get(&counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();

if let Some(inbound_channel) =
peer_state_lock.inbound_channels_by_id.get_mut(&user_channel_id)
{
if let Err(e) = inbound_channel.pay_for_channel(user_channel_id) {
peer_state_lock.remove_inbound_channel(user_channel_id);
return Err(APIError::APIMisuseError { err: e.err });
}

let request_id = crate::utils::generate_request_id(&self.entropy_source);
peer_state_lock.insert_request(request_id.clone(), user_channel_id);

self.pending_messages.enqueue(
counterparty_node_id,
LSPS1Message::Request(
request_id,
LSPS1Request::GetOrder(GetOrderRequest { order_id: order_id.clone() }),
let (result, request_msg) = {
let outer_state_lock = self.per_peer_state.write().unwrap();
match outer_state_lock.get(&counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();

if let Some(inbound_channel) =
peer_state_lock.inbound_channels_by_id.get_mut(&user_channel_id)
{
if let Err(e) = inbound_channel.pay_for_channel(user_channel_id) {
peer_state_lock.remove_inbound_channel(user_channel_id);
return Err(APIError::APIMisuseError { err: e.err });
}

let request_id = crate::utils::generate_request_id(&self.entropy_source);
peer_state_lock.insert_request(request_id.clone(), user_channel_id);

let request =
LSPS1Request::GetOrder(GetOrderRequest { order_id: order_id.clone() });
let msg = LSPS1Message::Request(request_id, request).into();
(Ok(()), Some(msg))
} else {
(
Err(APIError::APIMisuseError {
err: format!(
"Channel with user_channel_id {} not found",
user_channel_id
),
}),
None,
)
.into(),
);
} else {
return Err(APIError::APIMisuseError {
err: format!("Channel with user_channel_id {} not found", user_channel_id),
});
}
},
None => {
return Err(APIError::APIMisuseError {
err: format!("No existing state with counterparty {}", counterparty_node_id),
})
},
}
},
None => (
Err(APIError::APIMisuseError {
err: format!(
"No existing state with counterparty {}",
counterparty_node_id
),
}),
None,
),
}
};

if let Some(msg) = request_msg {
self.pending_messages.enqueue(&counterparty_node_id, msg);
}

Ok(())
result
}

fn handle_get_order_response(
Expand Down
Loading