@@ -71,7 +71,7 @@ use lightning::ln::peer_handler;
71
71
use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
72
72
use lightning:: ln:: msgs:: ChannelMessageHandler ;
73
73
74
- use std:: task;
74
+ use std:: { task, thread } ;
75
75
use std:: net:: SocketAddr ;
76
76
use std:: sync:: { Arc , Mutex } ;
77
77
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
@@ -113,6 +113,11 @@ struct Connection {
113
113
// send into any read_blocker to wake the reading future back up and set read_paused back to
114
114
// false.
115
115
read_blocker : Option < oneshot:: Sender < ( ) > > ,
116
+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
117
+ // are sure we won't call any more read/write PeerManager functions with the same connection.
118
+ // This is set to true if we're in such a condition (with disconnect checked before with the
119
+ // top-level mutex held) and false when we can return.
120
+ block_disconnect_socket : bool ,
116
121
read_paused : bool ,
117
122
disconnect_state : DisconnectionState ,
118
123
id : u64 ,
@@ -130,20 +135,29 @@ impl Connection {
130
135
} }
131
136
}
132
137
138
+ macro_rules! prepare_read_write_call {
139
+ ( ) => { {
140
+ let mut us_lock = us. lock( ) . unwrap( ) ;
141
+ if us_lock. disconnect_state == DisconnectionState :: RLTriggeredDisconnect {
142
+ shutdown_socket!( "disconnect_socket() call from RL" ) ;
143
+ }
144
+ us_lock. block_disconnect_socket = true ;
145
+ } }
146
+ }
147
+
133
148
// Whenever we want to block on reading or waiting for reading to resume, we have to
134
149
// at least select with the write_avail_receiver, which is used by the
135
150
// SocketDescriptor to wake us up if we need to shut down the socket or if we need
136
151
// to generate a write_buffer_space_avail call.
137
152
macro_rules! select_write_ev {
138
153
( $v: expr) => { {
139
154
assert!( $v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
140
- if us. lock( ) . unwrap( ) . disconnect_state == DisconnectionState :: RLTriggeredDisconnect {
141
- shutdown_socket!( "disconnect_socket() call from RL" ) ;
142
- }
155
+ prepare_read_write_call!( ) ;
143
156
if let Err ( e) = peer_manager. write_buffer_space_avail( & mut SocketDescriptor :: new( us. clone( ) ) ) {
144
157
us. lock( ) . unwrap( ) . disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
145
158
shutdown_socket!( e) ;
146
159
}
160
+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
147
161
} }
148
162
}
149
163
@@ -176,6 +190,7 @@ impl Connection {
176
190
v = write_avail_receiver. recv( ) => select_write_ev!( v) ,
177
191
}
178
192
}
193
+ prepare_read_write_call!( ) ;
179
194
match peer_manager. read_event( & mut SocketDescriptor :: new( Arc :: clone( & us) ) , & buf[ 0 ..len] ) {
180
195
Ok ( pause_read) => {
181
196
if pause_read {
@@ -197,6 +212,7 @@ impl Connection {
197
212
shutdown_socket!( e)
198
213
} ,
199
214
}
215
+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
200
216
} ,
201
217
Err ( e) => {
202
218
println!( "Connection closed: {}" , e) ;
@@ -205,6 +221,7 @@ impl Connection {
205
221
} ,
206
222
}
207
223
}
224
+ us. lock ( ) . unwrap ( ) . block_disconnect_socket = false ;
208
225
let writer_option = us. lock ( ) . unwrap ( ) . writer . take ( ) ;
209
226
if let Some ( mut writer) = writer_option {
210
227
// If the socket is already closed, shutdown() will fail, so just ignore it.
@@ -234,7 +251,7 @@ impl Connection {
234
251
235
252
( reader, receiver,
236
253
Arc :: new ( Mutex :: new ( Self {
237
- writer : Some ( writer) , event_notify, write_avail,
254
+ writer : Some ( writer) , event_notify, write_avail, block_disconnect_socket : false ,
238
255
read_blocker : None , read_paused : false , disconnect_state : DisconnectionState :: NeedDisconnectEvent ,
239
256
id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
240
257
} ) ) )
@@ -423,15 +440,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
423
440
}
424
441
425
442
fn disconnect_socket ( & mut self ) {
426
- let mut us = self . conn . lock ( ) . unwrap ( ) ;
427
- us. disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
428
- us. read_paused = true ;
429
- // Wake up the sending thread, assuming it is still alive
430
- let _ = us. write_avail . try_send ( ( ) ) ;
431
- // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
432
- // read task is about to call a PeerManager function (eg read_event or write_event).
433
- // Ideally we need to release the us lock and block until we have confirmation from the
434
- // read task that it has broken out of its main loop.
443
+ {
444
+ let mut us = self . conn . lock ( ) . unwrap ( ) ;
445
+ us. disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
446
+ us. read_paused = true ;
447
+ // Wake up the sending thread, assuming it is still alive
448
+ let _ = us. write_avail . try_send ( ( ) ) ;
449
+ // Happy-path return:
450
+ if !us. block_disconnect_socket { return ; }
451
+ }
452
+ while self . conn . lock ( ) . unwrap ( ) . block_disconnect_socket {
453
+ thread:: yield_now ( ) ;
454
+ }
435
455
}
436
456
}
437
457
impl Clone for SocketDescriptor {
0 commit comments