@@ -52,7 +52,7 @@ use lightning::ln::peer_handler;
52
52
use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
53
53
use lightning:: ln:: msgs:: ChannelMessageHandler ;
54
54
55
- use std:: task;
55
+ use std:: { task, thread } ;
56
56
use std:: net:: SocketAddr ;
57
57
use std:: sync:: { Arc , Mutex } ;
58
58
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
@@ -82,6 +82,11 @@ struct Connection {
82
82
// so by setting read_paused. If the read thread thereafter reads some data, it will place a
83
83
// Sender here and then block on it.
84
84
read_blocker : Option < oneshot:: Sender < ( ) > > ,
85
+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
86
+ // are sure we won't call any more read/write PeerManager functions with the same connection.
87
+ // This is set to true if we're in such a condition (with disconnect checked before with the
88
+ // top-level mutex held) and false when we can return.
89
+ disconnect_block : bool ,
85
90
read_paused : bool ,
86
91
// If we get disconnected via SocketDescriptor::disconnect_socket(), we don't call
87
92
// disconnect_event(), but if we get an Err return value out of PeerManager, in general, we do.
@@ -102,19 +107,28 @@ impl Connection {
102
107
} }
103
108
}
104
109
110
+ macro_rules! prepare_read_write_call {
111
+ ( ) => { {
112
+ let mut us_lock = us. lock( ) . unwrap( ) ;
113
+ if us_lock. disconnect {
114
+ shutdown_socket!( "disconnect_socket() call from RL" ) ;
115
+ }
116
+ us_lock. disconnect_block = true ;
117
+ } }
118
+ }
119
+
105
120
// Whenever we want to block, we have to at least select with the write_event Receiver,
106
121
// which is used by the SocketDescriptor to wake us up if we need to shut down the
107
122
// socket or if we need to generate a write_event.
108
123
macro_rules! select_write_ev {
109
124
( $v: expr) => { {
110
125
assert!( $v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
111
- if us. lock( ) . unwrap( ) . disconnect {
112
- shutdown_socket!( "disconnect_socket() call from RL" ) ;
113
- }
126
+ prepare_read_write_call!( ) ;
114
127
if let Err ( e) = peer_manager. write_buffer_space_avail( & mut SocketDescriptor :: new( us. clone( ) ) ) {
115
128
us. lock( ) . unwrap( ) . need_disconnect_event = false ;
116
129
shutdown_socket!( e) ;
117
130
}
131
+ us. lock( ) . unwrap( ) . disconnect_block = false ;
118
132
} }
119
133
}
120
134
@@ -147,6 +161,7 @@ impl Connection {
147
161
v = write_event. recv( ) => select_write_ev!( v) ,
148
162
}
149
163
}
164
+ prepare_read_write_call!( ) ;
150
165
match peer_manager. read_event( & mut SocketDescriptor :: new( Arc :: clone( & us) ) , & buf[ 0 ..len] ) {
151
166
Ok ( pause_read) => {
152
167
if pause_read {
@@ -168,6 +183,7 @@ impl Connection {
168
183
shutdown_socket!( e)
169
184
} ,
170
185
}
186
+ us. lock( ) . unwrap( ) . disconnect_block = false ;
171
187
} ,
172
188
Err ( e) => {
173
189
println!( "Connection closed: {}" , e) ;
@@ -176,6 +192,7 @@ impl Connection {
176
192
} ,
177
193
}
178
194
}
195
+ us. lock ( ) . unwrap ( ) . disconnect_block = false ;
179
196
let writer_option = us. lock ( ) . unwrap ( ) . writer . take ( ) ;
180
197
if let Some ( mut writer) = writer_option {
181
198
// If the socket is already closed, shutdown() will fail, so just ignore it.
@@ -205,7 +222,7 @@ impl Connection {
205
222
206
223
( reader, receiver,
207
224
Arc :: new ( Mutex :: new ( Self {
208
- writer : Some ( writer) , event_notify, write_avail,
225
+ writer : Some ( writer) , event_notify, write_avail, disconnect_block : false ,
209
226
read_blocker : None , read_paused : false , need_disconnect_event : true , disconnect : false ,
210
227
id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
211
228
} ) ) )
@@ -371,16 +388,19 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
371
388
}
372
389
373
390
fn disconnect_socket ( & mut self ) {
374
- let mut us = self . conn . lock ( ) . unwrap ( ) ;
375
- us. need_disconnect_event = false ;
376
- us. disconnect = true ;
377
- us. read_paused = true ;
378
- // Wake up the sending thread, assuming its still alive
379
- let _ = us. write_avail . try_send ( ( ) ) ;
380
- // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
381
- // read task is about to call a PeerManager function (eg read_event or write_event).
382
- // Ideally we need to release the us lock and block until we have confirmation from the
383
- // read task that it has broken out of its main loop.
391
+ {
392
+ let mut us = self . conn . lock ( ) . unwrap ( ) ;
393
+ us. need_disconnect_event = false ;
394
+ us. disconnect = true ;
395
+ us. read_paused = true ;
396
+ // Wake up the sending thread, assuming its still alive
397
+ let _ = us. write_avail . try_send ( ( ) ) ;
398
+ // Happy-path return:
399
+ if !us. disconnect_block { return ; }
400
+ }
401
+ while self . conn . lock ( ) . unwrap ( ) . disconnect_block {
402
+ thread:: yield_now ( ) ;
403
+ }
384
404
}
385
405
}
386
406
impl Clone for SocketDescriptor {
0 commit comments