@@ -32,6 +32,7 @@ use lightning_types::payment::PaymentHash;
32
32
use bitcoin:: secp256k1:: PublicKey ;
33
33
34
34
use core:: ops:: Deref ;
35
+ use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
35
36
36
37
use crate :: lsps2:: msgs:: {
37
38
BuyRequest , BuyResponse , GetInfoRequest , GetInfoResponse , LSPS2Message , LSPS2Request ,
@@ -43,6 +44,7 @@ use crate::lsps2::msgs::{
43
44
} ;
44
45
45
46
const MAX_PENDING_REQUESTS_PER_PEER : usize = 10 ;
47
+ const MAX_TOTAL_PENDING_REQUESTS : usize = 1000 ;
46
48
47
49
/// Server-side configuration options for JIT channels.
48
50
#[ derive( Clone , Debug ) ]
@@ -470,6 +472,7 @@ where
470
472
per_peer_state : RwLock < HashMap < PublicKey , Mutex < PeerState > > > ,
471
473
peer_by_intercept_scid : RwLock < HashMap < u64 , PublicKey > > ,
472
474
peer_by_channel_id : RwLock < HashMap < ChannelId , PublicKey > > ,
475
+ total_pending_requests : AtomicUsize ,
473
476
config : LSPS2ServiceConfig ,
474
477
}
475
478
@@ -488,6 +491,7 @@ where
488
491
per_peer_state : RwLock :: new ( new_hash_map ( ) ) ,
489
492
peer_by_intercept_scid : RwLock :: new ( new_hash_map ( ) ) ,
490
493
peer_by_channel_id : RwLock :: new ( new_hash_map ( ) ) ,
494
+ total_pending_requests : AtomicUsize :: new ( 0 ) ,
491
495
channel_manager,
492
496
config,
493
497
}
@@ -1145,8 +1149,27 @@ where
1145
1149
& self , peer_state_lock : & mut MutexGuard < ' a , PeerState > , request_id : RequestId ,
1146
1150
counterparty_node_id : PublicKey , request : LSPS2Request ,
1147
1151
) -> ( Result < ( ) , LightningError > , Option < LSPSMessage > ) {
1152
+ if self . total_pending_requests . load ( Ordering :: Relaxed ) >= MAX_TOTAL_PENDING_REQUESTS {
1153
+ let response = LSPS2Response :: BuyError ( ResponseError {
1154
+ code : LSPS0_CLIENT_REJECTED_ERROR_CODE ,
1155
+ message : "Reached maximum number of pending requests. Please try again later."
1156
+ . to_string ( ) ,
1157
+ data : None ,
1158
+ } ) ;
1159
+ let msg = Some ( LSPS2Message :: Response ( request_id, response) . into ( ) ) ;
1160
+
1161
+ let err = format ! (
1162
+ "Peer {} reached maximum number of total pending requests: {}" ,
1163
+ counterparty_node_id, MAX_TOTAL_PENDING_REQUESTS
1164
+ ) ;
1165
+ let result =
1166
+ Err ( LightningError { err, action : ErrorAction :: IgnoreAndLog ( Level :: Debug ) } ) ;
1167
+ return ( result, msg) ;
1168
+ }
1169
+
1148
1170
if peer_state_lock. pending_requests . len ( ) < MAX_PENDING_REQUESTS_PER_PEER {
1149
1171
peer_state_lock. pending_requests . insert ( request_id, request) ;
1172
+ self . total_pending_requests . fetch_add ( 1 , Ordering :: Relaxed ) ;
1150
1173
( Ok ( ( ) ) , None )
1151
1174
} else {
1152
1175
let response = LSPS2Response :: BuyError ( ResponseError {
@@ -1171,7 +1194,43 @@ where
1171
1194
fn remove_pending_request < ' a > (
1172
1195
& self , peer_state_lock : & mut MutexGuard < ' a , PeerState > , request_id : & RequestId ,
1173
1196
) -> Option < LSPS2Request > {
1174
- peer_state_lock. pending_requests . remove ( request_id)
1197
+ match peer_state_lock. pending_requests . remove ( request_id) {
1198
+ Some ( req) => {
1199
+ let res = self . total_pending_requests . fetch_update (
1200
+ Ordering :: Relaxed ,
1201
+ Ordering :: Relaxed ,
1202
+ |x| Some ( x. saturating_sub ( 1 ) ) ,
1203
+ ) ;
1204
+ match res {
1205
+ Ok ( previous_value) if previous_value == 0 => debug_assert ! (
1206
+ false ,
1207
+ "total_pending_requests counter out-of-sync! This should never happen!"
1208
+ ) ,
1209
+ Err ( previous_value) if previous_value == 0 => debug_assert ! (
1210
+ false ,
1211
+ "total_pending_requests counter out-of-sync! This should never happen!"
1212
+ ) ,
1213
+ _ => { } ,
1214
+ }
1215
+ Some ( req)
1216
+ } ,
1217
+ res => res,
1218
+ }
1219
+ }
1220
+
1221
+ #[ cfg( debug_assertions) ]
1222
+ fn verify_pending_request_counter ( & self ) {
1223
+ let mut num_requests = 0 ;
1224
+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
1225
+ for ( _, inner) in outer_state_lock. iter ( ) {
1226
+ let inner_state_lock = inner. lock ( ) . unwrap ( ) ;
1227
+ num_requests += inner_state_lock. pending_requests . len ( ) ;
1228
+ }
1229
+ debug_assert_eq ! (
1230
+ num_requests,
1231
+ self . total_pending_requests. load( Ordering :: Relaxed ) ,
1232
+ "total_pending_requests counter out-of-sync! This should never happen!"
1233
+ ) ;
1175
1234
}
1176
1235
}
1177
1236
@@ -1186,13 +1245,18 @@ where
1186
1245
& self , message : Self :: ProtocolMessage , counterparty_node_id : & PublicKey ,
1187
1246
) -> Result < ( ) , LightningError > {
1188
1247
match message {
1189
- LSPS2Message :: Request ( request_id, request) => match request {
1190
- LSPS2Request :: GetInfo ( params) => {
1191
- self . handle_get_info_request ( request_id, counterparty_node_id, params)
1192
- } ,
1193
- LSPS2Request :: Buy ( params) => {
1194
- self . handle_buy_request ( request_id, counterparty_node_id, params)
1195
- } ,
1248
+ LSPS2Message :: Request ( request_id, request) => {
1249
+ let res = match request {
1250
+ LSPS2Request :: GetInfo ( params) => {
1251
+ self . handle_get_info_request ( request_id, counterparty_node_id, params)
1252
+ } ,
1253
+ LSPS2Request :: Buy ( params) => {
1254
+ self . handle_buy_request ( request_id, counterparty_node_id, params)
1255
+ } ,
1256
+ } ;
1257
+ #[ cfg( debug_assertions) ]
1258
+ self . verify_pending_request_counter ( ) ;
1259
+ res
1196
1260
} ,
1197
1261
_ => {
1198
1262
debug_assert ! (
0 commit comments