Skip to content

Commit 14e8eba

Browse files
authored
Merge pull request swiftlang#1260 from pushkarnk/urlsession-fixes-4.0
2 parents 8afed75 + 391e5b1 commit 14e8eba

File tree

5 files changed

+147
-108
lines changed

5 files changed

+147
-108
lines changed

Foundation/URLSession/URLSession.swift

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,13 @@ import CoreFoundation
172172
import Dispatch
173173

174174

175+
fileprivate let globalVarSyncQ = DispatchQueue(label: "org.swift.Foundation.URLSession.GlobalVarSyncQ")
175176
fileprivate var sessionCounter = Int32(0)
176177
fileprivate func nextSessionIdentifier() -> Int32 {
177-
//TODO: find an alternative for OSAtomicIncrement32Barrier() on Linux
178-
sessionCounter += 1
179-
return sessionCounter
178+
return globalVarSyncQ.sync {
179+
sessionCounter += 1
180+
return sessionCounter
181+
}
180182
}
181183
public let NSURLSessionTransferSizeUnknown: Int64 = -1
182184

@@ -397,9 +399,11 @@ extension URLSession._Request {
397399

398400
fileprivate extension URLSession {
399401
func createNextTaskIdentifier() -> Int {
400-
let i = nextTaskIdentifier
401-
nextTaskIdentifier += 1
402-
return i
402+
return workQueue.sync {
403+
let i = nextTaskIdentifier
404+
nextTaskIdentifier += 1
405+
return i
406+
}
403407
}
404408
}
405409

Foundation/URLSession/URLSessionTask.swift

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,10 @@ open class URLSessionTask : NSObject, NSCopying {
3030
internal var session: URLSessionProtocol! //change to nil when task completes
3131
internal let body: _Body
3232
fileprivate var _protocol: URLProtocol? = nil
33-
33+
private let syncQ = DispatchQueue(label: "org.swift.URLSessionTask.SyncQ")
34+
3435
/// All operations must run on this queue.
3536
internal let workQueue: DispatchQueue
36-
/// Using dispatch semaphore to make public attributes thread safe.
37-
/// A semaphore is a simpler option against the usage of concurrent queue
38-
/// as the critical sections are very short.
39-
fileprivate let semaphore = DispatchSemaphore(value: 1)
4037

4138
public override init() {
4239
// Darwin Foundation oddly allows calling this initializer, even though
@@ -62,7 +59,8 @@ open class URLSessionTask : NSObject, NSCopying {
6259
}
6360
internal init(session: URLSession, request: URLRequest, taskIdentifier: Int, body: _Body) {
6461
self.session = session
65-
self.workQueue = session.workQueue
62+
/* make sure we're actually having a serial queue as it's used for synchronization */
63+
self.workQueue = DispatchQueue.init(label: "org.swift.URLSessionTask.WorkQueue", target: session.workQueue)
6664
self.taskIdentifier = taskIdentifier
6765
self.originalRequest = request
6866
self.body = body
@@ -108,31 +106,19 @@ open class URLSessionTask : NSObject, NSCopying {
108106
/// May differ from originalRequest due to http server redirection
109107
/*@NSCopying*/ open internal(set) var currentRequest: URLRequest? {
110108
get {
111-
semaphore.wait()
112-
defer {
113-
semaphore.signal()
114-
}
115-
return self._currentRequest
109+
return self.syncQ.sync { return self._currentRequest }
116110
}
117111
set {
118-
semaphore.wait()
119-
self._currentRequest = newValue
120-
semaphore.signal()
112+
self.syncQ.sync { self._currentRequest = newValue }
121113
}
122114
}
123115
fileprivate var _currentRequest: URLRequest? = nil
124116
/*@NSCopying*/ open internal(set) var response: URLResponse? {
125117
get {
126-
semaphore.wait()
127-
defer {
128-
semaphore.signal()
129-
}
130-
return self._response
118+
return self.syncQ.sync { return self._response }
131119
}
132120
set {
133-
semaphore.wait()
134-
self._response = newValue
135-
semaphore.signal()
121+
self.syncQ.sync { self._response = newValue }
136122
}
137123
}
138124
fileprivate var _response: URLResponse? = nil
@@ -145,33 +131,21 @@ open class URLSessionTask : NSObject, NSCopying {
145131
/// Number of body bytes already received
146132
open internal(set) var countOfBytesReceived: Int64 {
147133
get {
148-
semaphore.wait()
149-
defer {
150-
semaphore.signal()
151-
}
152-
return self._countOfBytesReceived
134+
return self.syncQ.sync { return self._countOfBytesReceived }
153135
}
154136
set {
155-
semaphore.wait()
156-
self._countOfBytesReceived = newValue
157-
semaphore.signal()
137+
self.syncQ.sync { self._countOfBytesReceived = newValue }
158138
}
159139
}
160140
fileprivate var _countOfBytesReceived: Int64 = 0
161141

162142
/// Number of body bytes already sent */
163143
open internal(set) var countOfBytesSent: Int64 {
164144
get {
165-
semaphore.wait()
166-
defer {
167-
semaphore.signal()
168-
}
169-
return self._countOfBytesSent
145+
return self.syncQ.sync { return self._countOfBytesSent }
170146
}
171147
set {
172-
semaphore.wait()
173-
self._countOfBytesSent = newValue
174-
semaphore.signal()
148+
self.syncQ.sync { self._countOfBytesSent = newValue }
175149
}
176150
}
177151

@@ -211,16 +185,10 @@ open class URLSessionTask : NSObject, NSCopying {
211185
*/
212186
open var state: URLSessionTask.State {
213187
get {
214-
semaphore.wait()
215-
defer {
216-
semaphore.signal()
217-
}
218-
return self._state
188+
return self.syncQ.sync { self._state }
219189
}
220190
set {
221-
semaphore.wait()
222-
self._state = newValue
223-
semaphore.signal()
191+
self.syncQ.sync { self._state = newValue }
224192
}
225193
}
226194
fileprivate var _state: URLSessionTask.State = .suspended
@@ -315,16 +283,10 @@ open class URLSessionTask : NSObject, NSCopying {
315283
/// URLSessionTask.highPriority, but use is not restricted to these.
316284
open var priority: Float {
317285
get {
318-
semaphore.wait()
319-
defer {
320-
semaphore.signal()
321-
}
322-
return self._priority
286+
return self.workQueue.sync { return self._priority }
323287
}
324288
set {
325-
semaphore.wait()
326-
self._priority = newValue
327-
semaphore.signal()
289+
self.workQueue.sync { self._priority = newValue }
328290
}
329291
}
330292
fileprivate var _priority: Float = URLSessionTask.defaultPriority
@@ -569,7 +531,9 @@ extension _ProtocolClient : URLProtocolClient {
569531
session.delegateQueue.addOperation {
570532
delegate.urlSession(session, task: task, didCompleteWithError: nil)
571533
task.state = .completed
572-
session.taskRegistry.remove(task)
534+
task.workQueue.async {
535+
session.taskRegistry.remove(task)
536+
}
573537
}
574538
case .noDelegate:
575539
task.state = .completed
@@ -625,7 +589,9 @@ extension _ProtocolClient : URLProtocolClient {
625589
session.delegateQueue.addOperation {
626590
delegate.urlSession(session, task: task, didCompleteWithError: error as Error)
627591
task.state = .completed
628-
session.taskRegistry.remove(task)
592+
task.workQueue.async {
593+
session.taskRegistry.remove(task)
594+
}
629595
}
630596
case .noDelegate:
631597
task.state = .completed
@@ -634,7 +600,9 @@ extension _ProtocolClient : URLProtocolClient {
634600
session.delegateQueue.addOperation {
635601
completion(nil, nil, error)
636602
task.state = .completed
637-
session.taskRegistry.remove(task)
603+
task.workQueue.async {
604+
session.taskRegistry.remove(task)
605+
}
638606
}
639607
case .downloadCompletionHandler(let completion):
640608
session.delegateQueue.addOperation {

Foundation/URLSession/http/MultiHandle.swift

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@ extension URLSession {
3939
let group = DispatchGroup()
4040
fileprivate var easyHandles: [_EasyHandle] = []
4141
fileprivate var timeoutSource: _TimeoutSource? = nil
42-
43-
//SR-4567: we need to synchronize the register/unregister commands to the epoll machinery in libdispatch
44-
fileprivate let commandQueue: DispatchQueue = DispatchQueue(label: "Register-unregister synchronization")
45-
fileprivate var cancelInProgress: DispatchSemaphore? = nil
46-
42+
4743
init(configuration: URLSession._Configuration, workQueue: DispatchQueue) {
4844
queue = DispatchQueue(label: "MultiHandle.isolation", target: workQueue)
4945
setupCallbacks()
@@ -103,33 +99,25 @@ fileprivate extension URLSession._MultiHandle {
10399
// through libdispatch (DispatchSource) and store the source(s) inside
104100
// a `SocketSources` which we in turn store inside libcurl's multi handle
105101
// by means of curl_multi_assign() -- we retain the object fist.
106-
commandQueue.async {
107-
self.cancelInProgress?.wait()
108-
self.cancelInProgress = nil
109-
110-
let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
111-
var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
112-
if socketSources == nil && action.needsSource {
113-
let s = _SocketSources()
114-
let p = Unmanaged.passRetained(s).toOpaque()
115-
CFURLSessionMultiHandleAssign(self.rawHandle, socket, UnsafeMutableRawPointer(p))
116-
socketSources = s
117-
} else if socketSources != nil && action == .unregister {
118-
//the beginning of an unregister operation
119-
self.cancelInProgress = DispatchSemaphore(value: 0)
120-
// We need to release the stored pointer:
121-
if let opaque = socketSourcePtr {
122-
Unmanaged<_SocketSources>.fromOpaque(opaque).release()
123-
}
124-
socketSources?.tearDown(self.cancelInProgress)
125-
socketSources = nil
102+
let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
103+
var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
104+
if socketSources == nil && action.needsSource {
105+
let s = _SocketSources()
106+
let p = Unmanaged.passRetained(s).toOpaque()
107+
CFURLSessionMultiHandleAssign(rawHandle, socket, UnsafeMutableRawPointer(p))
108+
socketSources = s
109+
} else if socketSources != nil && action == .unregister {
110+
// We need to release the stored pointer:
111+
if let opaque = socketSourcePtr {
112+
Unmanaged<_SocketSources>.fromOpaque(opaque).release()
126113
}
127-
if let ss = socketSources {
128-
let handler = DispatchWorkItem { [weak self] in
129-
self?.performAction(for: socket)
130-
}
131-
ss.createSources(with: action, fileDescriptor: Int(socket), queue: self.queue, handler: handler)
114+
socketSources = nil
115+
}
116+
if let ss = socketSources {
117+
let handler = DispatchWorkItem { [weak self] in
118+
self?.performAction(for: socket)
132119
}
120+
ss.createSources(with: action, fileDescriptor: Int(socket), queue: queue, handler: handler)
133121
}
134122
return 0
135123
}
@@ -411,18 +399,12 @@ fileprivate class _SocketSources {
411399
s.resume()
412400
}
413401

414-
func tearDown(_ cancelInProgress: DispatchSemaphore?) {
415-
let cancelHandler = DispatchWorkItem {
416-
//the real end of an unregister operation!
417-
cancelInProgress?.signal()
418-
}
402+
func tearDown() {
419403
if let s = readSource {
420-
s.setCancelHandler(handler: cancelHandler)
421404
s.cancel()
422405
}
423406
readSource = nil
424407
if let s = writeSource {
425-
s.setCancelHandler(handler: cancelHandler)
426408
s.cancel()
427409
}
428410
writeSource = nil

TestFoundation/HTTPServer.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import Dispatch
2626
#endif
2727

2828
public let globalDispatchQueue = DispatchQueue.global()
29+
public let dispatchQueueMake: (String) -> DispatchQueue = { DispatchQueue.init(label: $0) }
30+
public let dispatchGroupMake: () -> DispatchGroup = DispatchGroup.init
2931

3032
struct _HTTPUtils {
3133
static let CRLF = "\r\n"

0 commit comments

Comments
 (0)