@@ -1149,8 +1149,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
1149
1149
1150
1150
#[ cfg( test) ]
1151
1151
mod tests {
1152
+ use secp256k1:: Signature ;
1153
+ use bitcoin:: BitcoinHash ;
1154
+ use bitcoin:: network:: constants:: Network ;
1155
+ use bitcoin:: blockdata:: constants:: genesis_block;
1152
1156
use ln:: peer_handler:: { PeerManager , MessageHandler , SocketDescriptor } ;
1153
1157
use ln:: msgs;
1158
+ use ln:: features:: ChannelFeatures ;
1154
1159
use util:: events;
1155
1160
use util:: test_utils;
1156
1161
use util:: logger:: Logger ;
@@ -1161,7 +1166,9 @@ mod tests {
1161
1166
use rand:: { thread_rng, Rng } ;
1162
1167
1163
1168
use std;
1169
+ use std:: cmp:: min;
1164
1170
use std:: sync:: { Arc , Mutex } ;
1171
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
1165
1172
1166
1173
#[ derive( Clone ) ]
1167
1174
struct FileDescriptor {
@@ -1199,29 +1206,31 @@ mod tests {
1199
1206
chan_handlers
1200
1207
}
1201
1208
1202
- fn create_network < ' a > ( peer_count : usize , chan_handlers : & ' a Vec < test_utils:: TestChannelMessageHandler > ) -> Vec < PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > > {
1209
+ fn create_network < ' a > ( peer_count : usize , chan_handlers : & ' a Vec < test_utils:: TestChannelMessageHandler > , routing_handlers : Option < & ' a Vec < Arc < msgs :: RoutingMessageHandler > > > ) -> Vec < PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > > {
1203
1210
let mut peers = Vec :: new ( ) ;
1204
1211
let mut rng = thread_rng ( ) ;
1205
1212
let logger : Arc < Logger > = Arc :: new ( test_utils:: TestLogger :: new ( ) ) ;
1206
1213
let mut ephemeral_bytes = [ 0 ; 32 ] ;
1207
1214
rng. fill_bytes ( & mut ephemeral_bytes) ;
1208
1215
1209
1216
for i in 0 ..peer_count {
1210
- let router = test_utils:: TestRoutingMessageHandler :: new ( ) ;
1217
+ let router = if let Some ( routers) = routing_handlers { routers[ i] . clone ( ) } else {
1218
+ Arc :: new ( test_utils:: TestRoutingMessageHandler :: new ( ) )
1219
+ } ;
1211
1220
let node_id = {
1212
1221
let mut key_slice = [ 0 ; 32 ] ;
1213
1222
rng. fill_bytes ( & mut key_slice) ;
1214
1223
SecretKey :: from_slice ( & key_slice) . unwrap ( )
1215
1224
} ;
1216
- let msg_handler = MessageHandler { chan_handler : & chan_handlers[ i] , route_handler : Arc :: new ( router) } ;
1225
+ let msg_handler = MessageHandler { chan_handler : & chan_handlers[ i] , route_handler : router } ;
1217
1226
let peer = PeerManager :: new ( msg_handler, node_id, & ephemeral_bytes, Arc :: clone ( & logger) ) ;
1218
1227
peers. push ( peer) ;
1219
1228
}
1220
1229
1221
1230
peers
1222
1231
}
1223
1232
1224
- fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > ) {
1233
+ fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > ) -> ( FileDescriptor , FileDescriptor ) {
1225
1234
let secp_ctx = Secp256k1 :: new ( ) ;
1226
1235
let a_id = PublicKey :: from_secret_key ( & secp_ctx, & peer_a. our_node_secret ) ;
1227
1236
let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
@@ -1231,6 +1240,7 @@ mod tests {
1231
1240
assert_eq ! ( peer_a. read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
1232
1241
assert_eq ! ( peer_b. read_event( & mut fd_b, & fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . unwrap( ) , false ) ;
1233
1242
assert_eq ! ( peer_a. read_event( & mut fd_a, & fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . unwrap( ) , false ) ;
1243
+ ( fd_a. clone ( ) , fd_b. clone ( ) )
1234
1244
}
1235
1245
1236
1246
#[ test]
@@ -1239,7 +1249,7 @@ mod tests {
1239
1249
// push a DisconnectPeer event to remove the node flagged by id
1240
1250
let chan_handlers = create_chan_handlers ( 2 ) ;
1241
1251
let chan_handler = test_utils:: TestChannelMessageHandler :: new ( ) ;
1242
- let mut peers = create_network ( 2 , & chan_handlers) ;
1252
+ let mut peers = create_network ( 2 , & chan_handlers, None ) ;
1243
1253
establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
1244
1254
assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
1245
1255
@@ -1256,11 +1266,12 @@ mod tests {
1256
1266
peers[ 0 ] . process_events ( ) ;
1257
1267
assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
1258
1268
}
1269
+
1259
1270
#[ test]
1260
- fn test_timer_tick_occured ( ) {
1271
+ fn test_timer_tick_occurred ( ) {
1261
1272
// Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
1262
1273
let chan_handlers = create_chan_handlers ( 2 ) ;
1263
- let peers = create_network ( 2 , & chan_handlers) ;
1274
+ let peers = create_network ( 2 , & chan_handlers, None ) ;
1264
1275
establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
1265
1276
assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
1266
1277
@@ -1272,4 +1283,138 @@ mod tests {
1272
1283
peers[ 0 ] . timer_tick_occured ( ) ;
1273
1284
assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
1274
1285
}
1286
+
1287
+ pub struct TestRoutingMessageHandler {
1288
+ pub chan_upds_recvd : AtomicUsize ,
1289
+ pub chan_anns_recvd : AtomicUsize ,
1290
+ pub chan_anns_sent : AtomicUsize ,
1291
+ }
1292
+
1293
+ impl TestRoutingMessageHandler {
1294
+ pub fn new ( ) -> Self {
1295
+ TestRoutingMessageHandler {
1296
+ chan_upds_recvd : AtomicUsize :: new ( 0 ) ,
1297
+ chan_anns_recvd : AtomicUsize :: new ( 0 ) ,
1298
+ chan_anns_sent : AtomicUsize :: new ( 0 ) ,
1299
+ }
1300
+ }
1301
+
1302
+ }
1303
+ impl msgs:: RoutingMessageHandler for TestRoutingMessageHandler {
1304
+ fn handle_node_announcement ( & self , _msg : & msgs:: NodeAnnouncement ) -> Result < bool , msgs:: LightningError > {
1305
+ Err ( msgs:: LightningError { err : "" , action : msgs:: ErrorAction :: IgnoreError } )
1306
+ }
1307
+ fn handle_channel_announcement ( & self , _msg : & msgs:: ChannelAnnouncement ) -> Result < bool , msgs:: LightningError > {
1308
+ self . chan_anns_recvd . fetch_add ( 1 , Ordering :: AcqRel ) ;
1309
+ Err ( msgs:: LightningError { err : "" , action : msgs:: ErrorAction :: IgnoreError } )
1310
+ }
1311
+ fn handle_channel_update ( & self , _msg : & msgs:: ChannelUpdate ) -> Result < bool , msgs:: LightningError > {
1312
+ self . chan_upds_recvd . fetch_add ( 1 , Ordering :: AcqRel ) ;
1313
+ Err ( msgs:: LightningError { err : "" , action : msgs:: ErrorAction :: IgnoreError } )
1314
+ }
1315
+ fn handle_htlc_fail_channel_update ( & self , _update : & msgs:: HTLCFailChannelUpdate ) { }
1316
+ fn get_next_channel_announcements ( & self , starting_point : u64 , batch_amount : u8 ) -> Vec < ( msgs:: ChannelAnnouncement , msgs:: ChannelUpdate , msgs:: ChannelUpdate ) > {
1317
+ let mut chan_anns = Vec :: new ( ) ;
1318
+ const TOTAL_UPDS : u64 = 100 ;
1319
+ let end: u64 = min ( starting_point + batch_amount as u64 , TOTAL_UPDS - self . chan_anns_sent . load ( Ordering :: Acquire ) as u64 ) ;
1320
+ for i in starting_point..end {
1321
+ let chan_upd_1 = get_dummy_channel_update ( i) ;
1322
+ let chan_upd_2 = get_dummy_channel_update ( i) ;
1323
+ let chan_ann = get_dummy_channel_announcement ( i) ;
1324
+
1325
+ chan_anns. push ( ( chan_ann, chan_upd_1, chan_upd_2) ) ;
1326
+ }
1327
+
1328
+ self . chan_anns_sent . fetch_add ( chan_anns. len ( ) , Ordering :: AcqRel ) ;
1329
+ chan_anns
1330
+ }
1331
+
1332
+ fn get_next_node_announcements ( & self , _starting_point : Option < & PublicKey > , _batch_amount : u8 ) -> Vec < msgs:: NodeAnnouncement > {
1333
+ Vec :: new ( )
1334
+ }
1335
+
1336
+ fn should_request_full_sync ( & self , _node_id : & PublicKey ) -> bool {
1337
+ true
1338
+ }
1339
+ }
1340
+
1341
+ fn get_dummy_channel_announcement ( short_chan_id : u64 ) -> msgs:: ChannelAnnouncement {
1342
+ use secp256k1:: ffi:: Signature as FFISignature ;
1343
+ let secp_ctx = Secp256k1 :: new ( ) ;
1344
+ let network = Network :: Testnet ;
1345
+ let node_1_privkey = SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
1346
+ let node_2_privkey = SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
1347
+ let node_1_btckey = SecretKey :: from_slice ( & [ 40 ; 32 ] ) . unwrap ( ) ;
1348
+ let node_2_btckey = SecretKey :: from_slice ( & [ 39 ; 32 ] ) . unwrap ( ) ;
1349
+ let unsigned_ann = msgs:: UnsignedChannelAnnouncement {
1350
+ features : ChannelFeatures :: supported ( ) ,
1351
+ chain_hash : genesis_block ( network) . header . bitcoin_hash ( ) ,
1352
+ short_channel_id : short_chan_id,
1353
+ node_id_1 : PublicKey :: from_secret_key ( & secp_ctx, & node_1_privkey) ,
1354
+ node_id_2 : PublicKey :: from_secret_key ( & secp_ctx, & node_2_privkey) ,
1355
+ bitcoin_key_1 : PublicKey :: from_secret_key ( & secp_ctx, & node_1_btckey) ,
1356
+ bitcoin_key_2 : PublicKey :: from_secret_key ( & secp_ctx, & node_2_btckey) ,
1357
+ excess_data : Vec :: new ( ) ,
1358
+ } ;
1359
+
1360
+ msgs:: ChannelAnnouncement {
1361
+ node_signature_1 : Signature :: from ( FFISignature :: new ( ) ) ,
1362
+ node_signature_2 : Signature :: from ( FFISignature :: new ( ) ) ,
1363
+ bitcoin_signature_1 : Signature :: from ( FFISignature :: new ( ) ) ,
1364
+ bitcoin_signature_2 : Signature :: from ( FFISignature :: new ( ) ) ,
1365
+ contents : unsigned_ann,
1366
+ }
1367
+ }
1368
+
1369
+ fn get_dummy_channel_update ( short_chan_id : u64 ) -> msgs:: ChannelUpdate {
1370
+ use secp256k1:: ffi:: Signature as FFISignature ;
1371
+ let network = Network :: Testnet ;
1372
+ msgs:: ChannelUpdate {
1373
+ signature : Signature :: from ( FFISignature :: new ( ) ) ,
1374
+ contents : msgs:: UnsignedChannelUpdate {
1375
+ chain_hash : genesis_block ( network) . header . bitcoin_hash ( ) ,
1376
+ short_channel_id : short_chan_id,
1377
+ timestamp : 0 ,
1378
+ flags : 0 ,
1379
+ cltv_expiry_delta : 0 ,
1380
+ htlc_minimum_msat : 0 ,
1381
+ fee_base_msat : 0 ,
1382
+ fee_proportional_millionths : 0 ,
1383
+ excess_data : vec ! [ ] ,
1384
+ }
1385
+ }
1386
+ }
1387
+
1388
+ #[ test]
1389
+ fn test_do_attempt_write_data ( ) {
1390
+ // Create 2 peers with custom TestRoutingMessageHandlers and connect them.
1391
+ let chan_handlers = create_chan_handlers ( 2 ) ;
1392
+ let mut routing_handlers: Vec < Arc < msgs:: RoutingMessageHandler > > = Vec :: new ( ) ;
1393
+ let mut routing_handlers_concrete: Vec < Arc < TestRoutingMessageHandler > > = Vec :: new ( ) ;
1394
+ for _ in 0 ..2 {
1395
+ let routing_handler = Arc :: new ( TestRoutingMessageHandler :: new ( ) ) ;
1396
+ routing_handlers. push ( routing_handler. clone ( ) ) ;
1397
+ routing_handlers_concrete. push ( routing_handler. clone ( ) ) ;
1398
+ }
1399
+ let peers = create_network ( 2 , & chan_handlers, Some ( & routing_handlers) ) ;
1400
+
1401
+ // By calling establish_connect, we trigger do_attempt_write_data between
1402
+ // the peers. Previously this function would mistakenly enter an infinite loop
1403
+ // when there were more channel messages available than could fit into a peer's
1404
+ // buffer. This issue would now be detected by this test (because we use custom
1405
+ // RoutingMessageHandlers that intentionally return more channel messages
1406
+ // than can fit into a peer's buffer).
1407
+ let ( mut fd_a, mut fd_b) = establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
1408
+
1409
+ // Make each peer to read the messages that the other peer just wrote to them.
1410
+ peers[ 1 ] . read_event ( & mut fd_b, & fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
1411
+ peers[ 0 ] . read_event ( & mut fd_a, & fd_b. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
1412
+
1413
+ // Check that each peer has received the expected number of channel updates and channel
1414
+ // announcements.
1415
+ assert_eq ! ( routing_handlers_concrete[ 0 ] . clone( ) . chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
1416
+ assert_eq ! ( routing_handlers_concrete[ 0 ] . clone( ) . chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
1417
+ assert_eq ! ( routing_handlers_concrete[ 1 ] . clone( ) . chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
1418
+ assert_eq ! ( routing_handlers_concrete[ 1 ] . clone( ) . chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
1419
+ }
1275
1420
}
0 commit comments