@@ -64,16 +64,28 @@ public final class JSONRPCConnection {
64
64
self . messageRegistry = messageRegistry
65
65
self . syncRequests = syncRequests
66
66
67
+ let ioGroup = DispatchGroup ( )
68
+
69
+ ioGroup. enter ( )
67
70
receiveIO = DispatchIO ( type: . stream, fileDescriptor: inFD, queue: queue) { ( error: Int32 ) in
68
71
if error != 0 {
69
72
log ( " IO error \( error) " , level: . error)
70
73
}
74
+ ioGroup. leave ( )
71
75
}
72
76
77
+ ioGroup. enter ( )
73
78
sendIO = DispatchIO ( type: . stream, fileDescriptor: outFD, queue: sendQueue) { ( error: Int32 ) in
74
79
if error != 0 {
75
80
log ( " IO error \( error) " , level: . error)
76
81
}
82
+ ioGroup. leave ( )
83
+ }
84
+
85
+ ioGroup. notify ( queue: queue) { [ weak self] in
86
+ guard let self = self else { return }
87
+ self . closeHandler ( )
88
+ self . receiveHandler = nil // break retain cycle
77
89
}
78
90
79
91
// We cannot assume the client will send us bytes in packets of any particular size, so set the lower limit to 1.
@@ -99,7 +111,9 @@ public final class JSONRPCConnection {
99
111
100
112
receiveIO. read ( offset: 0 , length: Int . max, queue: queue) { done, data, errorCode in
101
113
guard errorCode == 0 else {
102
- log ( " IO error \( errorCode) " , level: . error)
114
+ if errorCode != POSIXError . ECANCELED. rawValue {
115
+ log ( " IO error reading \( errorCode) " , level: . error)
116
+ }
103
117
if done { self . _close ( ) }
104
118
return
105
119
}
@@ -287,6 +301,9 @@ public final class JSONRPCConnection {
287
301
}
288
302
289
303
/// Close the connection.
304
+ ///
305
+ /// The user-provided close handler will be called *asynchronously* when all outstanding I/O
306
+ /// operations have completed. No new I/O will be accepted after `close` returns.
290
307
public func close( ) {
291
308
queue. sync { _close ( ) }
292
309
}
@@ -298,10 +315,10 @@ public final class JSONRPCConnection {
298
315
state = . closed
299
316
300
317
log ( " \( JSONRPCConnection . self) : closing... " )
318
+ // Attempt to close the reader immediately; we do not need to accept remaining inputs.
301
319
receiveIO. close ( flags: . stop)
302
- sendIO. close ( flags: . stop)
303
- receiveHandler = nil // break retain cycle
304
- closeHandler ( )
320
+ // Close the writer after it finishes outstanding work.
321
+ sendIO. close ( )
305
322
}
306
323
}
307
324
0 commit comments