Skip to content

Commit 3f0ff62

Browse files
authored
Merge pull request #1186 from weissi/jw-improve-urlsession-threading
improve threading correctness in URLSession
2 parents d557d9e + 828161c commit 3f0ff62

File tree

4 files changed

+122
-67
lines changed

4 files changed

+122
-67
lines changed

Foundation/URLSession/URLSession.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,13 @@ extension URLSession {
179179
}
180180
}
181181

182+
fileprivate let globalVarSyncQ = DispatchQueue(label: "org.swift.Foundation.URLSession.GlobalVarSyncQ")
182183
fileprivate var sessionCounter = Int32(0)
183184
fileprivate func nextSessionIdentifier() -> Int32 {
184-
//TODO: find an alternative for OSAtomicIncrement32Barrier() on Linux
185-
sessionCounter += 1
186-
return sessionCounter
185+
return globalVarSyncQ.sync {
186+
sessionCounter += 1
187+
return sessionCounter
188+
}
187189
}
188190
public let NSURLSessionTransferSizeUnknown: Int64 = -1
189191

Foundation/URLSession/URLSessionTask.swift

Lines changed: 24 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,10 @@ open class URLSessionTask : NSObject, NSCopying {
3434
internal var session: URLSessionProtocol! //change to nil when task completes
3535
internal let body: _Body
3636
fileprivate var _protocol: URLProtocol! = nil
37+
private let syncQ = DispatchQueue(label: "org.swift.URLSessionTask.SyncQ")
3738

3839
/// All operations must run on this queue.
3940
internal let workQueue: DispatchQueue
40-
/// Using dispatch semaphore to make public attributes thread safe.
41-
/// A semaphore is a simpler option against the usage of concurrent queue
42-
/// as the critical sections are very short.
43-
fileprivate let semaphore = DispatchSemaphore(value: 1)
4441

4542
public override init() {
4643
// Darwin Foundation oddly allows calling this initializer, even though
@@ -66,7 +63,8 @@ open class URLSessionTask : NSObject, NSCopying {
6663
}
6764
internal init(session: URLSession, request: URLRequest, taskIdentifier: Int, body: _Body) {
6865
self.session = session
69-
self.workQueue = session.workQueue
66+
/* make sure we're actually having a serial queue as it's used for synchronization */
67+
self.workQueue = DispatchQueue.init(label: "org.swift.URLSessionTask.WorkQueue", target: session.workQueue)
7068
self.taskIdentifier = taskIdentifier
7169
self.originalRequest = request
7270
self.body = body
@@ -112,31 +110,19 @@ open class URLSessionTask : NSObject, NSCopying {
112110
/// May differ from originalRequest due to http server redirection
113111
/*@NSCopying*/ open internal(set) var currentRequest: URLRequest? {
114112
get {
115-
semaphore.wait()
116-
defer {
117-
semaphore.signal()
118-
}
119-
return self._currentRequest
113+
return self.syncQ.sync { return self._currentRequest }
120114
}
121115
set {
122-
semaphore.wait()
123-
self._currentRequest = newValue
124-
semaphore.signal()
116+
self.syncQ.sync { self._currentRequest = newValue }
125117
}
126118
}
127119
fileprivate var _currentRequest: URLRequest? = nil
128120
/*@NSCopying*/ open internal(set) var response: URLResponse? {
129121
get {
130-
semaphore.wait()
131-
defer {
132-
semaphore.signal()
133-
}
134-
return self._response
122+
return self.syncQ.sync { return self._response }
135123
}
136124
set {
137-
semaphore.wait()
138-
self._response = newValue
139-
semaphore.signal()
125+
self.syncQ.sync { self._response = newValue }
140126
}
141127
}
142128
fileprivate var _response: URLResponse? = nil
@@ -149,33 +135,21 @@ open class URLSessionTask : NSObject, NSCopying {
149135
/// Number of body bytes already received
150136
open internal(set) var countOfBytesReceived: Int64 {
151137
get {
152-
semaphore.wait()
153-
defer {
154-
semaphore.signal()
155-
}
156-
return self._countOfBytesReceived
138+
return self.syncQ.sync { return self._countOfBytesReceived }
157139
}
158140
set {
159-
semaphore.wait()
160-
self._countOfBytesReceived = newValue
161-
semaphore.signal()
141+
self.syncQ.sync { self._countOfBytesReceived = newValue }
162142
}
163143
}
164144
fileprivate var _countOfBytesReceived: Int64 = 0
165145

166146
/// Number of body bytes already sent */
167147
open internal(set) var countOfBytesSent: Int64 {
168148
get {
169-
semaphore.wait()
170-
defer {
171-
semaphore.signal()
172-
}
173-
return self._countOfBytesSent
149+
return self.syncQ.sync { return self._countOfBytesSent }
174150
}
175151
set {
176-
semaphore.wait()
177-
self._countOfBytesSent = newValue
178-
semaphore.signal()
152+
self.syncQ.sync { self._countOfBytesSent = newValue }
179153
}
180154
}
181155

@@ -215,16 +189,10 @@ open class URLSessionTask : NSObject, NSCopying {
215189
*/
216190
open var state: URLSessionTask.State {
217191
get {
218-
semaphore.wait()
219-
defer {
220-
semaphore.signal()
221-
}
222-
return self._state
192+
return self.syncQ.sync { self._state }
223193
}
224194
set {
225-
semaphore.wait()
226-
self._state = newValue
227-
semaphore.signal()
195+
self.syncQ.sync { self._state = newValue }
228196
}
229197
}
230198
fileprivate var _state: URLSessionTask.State = .suspended
@@ -305,16 +273,10 @@ open class URLSessionTask : NSObject, NSCopying {
305273
/// URLSessionTask.highPriority, but use is not restricted to these.
306274
open var priority: Float {
307275
get {
308-
semaphore.wait()
309-
defer {
310-
semaphore.signal()
311-
}
312-
return self._priority
276+
return self.workQueue.sync { return self._priority }
313277
}
314278
set {
315-
semaphore.wait()
316-
self._priority = newValue
317-
semaphore.signal()
279+
self.workQueue.sync { self._priority = newValue }
318280
}
319281
}
320282
fileprivate var _priority: Float = URLSessionTask.defaultPriority
@@ -565,7 +527,9 @@ extension _ProtocolClient : URLProtocolClient {
565527
session.delegateQueue.addOperation {
566528
delegate.urlSession(session, task: task, didCompleteWithError: nil)
567529
task.state = .completed
568-
session.taskRegistry.remove(task)
530+
task.workQueue.async {
531+
session.taskRegistry.remove(task)
532+
}
569533
}
570534
case .noDelegate:
571535
task.state = .completed
@@ -616,7 +580,9 @@ extension _ProtocolClient : URLProtocolClient {
616580
session.delegateQueue.addOperation {
617581
delegate.urlSession(session, task: task, didCompleteWithError: error as Error)
618582
task.state = .completed
619-
session.taskRegistry.remove(task)
583+
task.workQueue.async {
584+
session.taskRegistry.remove(task)
585+
}
620586
}
621587
case .noDelegate:
622588
task.state = .completed
@@ -625,7 +591,9 @@ extension _ProtocolClient : URLProtocolClient {
625591
session.delegateQueue.addOperation {
626592
completion(nil, nil, error)
627593
task.state = .completed
628-
session.taskRegistry.remove(task)
594+
task.workQueue.async {
595+
session.taskRegistry.remove(task)
596+
}
629597
}
630598
case .downloadCompletionHandler(let completion):
631599
session.delegateQueue.addOperation {

TestFoundation/HTTPServer.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import Dispatch
2626
#endif
2727

2828
public let globalDispatchQueue = DispatchQueue.global()
29+
public let dispatchQueueMake: (String) -> DispatchQueue = { DispatchQueue.init(label: $0) }
30+
public let dispatchGroupMake: () -> DispatchGroup = DispatchGroup.init
2931

3032
struct _HTTPUtils {
3133
static let CRLF = "\r\n"

TestFoundation/TestURLSession.swift

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class TestURLSession : LoopbackServerTest {
4343
("test_illegalHTTPServerResponses", test_illegalHTTPServerResponses),
4444
("test_dataTaskWithSharedDelegate", test_dataTaskWithSharedDelegate),
4545
("test_simpleUploadWithDelegate", test_simpleUploadWithDelegate),
46+
("test_concurrentRequests", test_concurrentRequests),
4647
]
4748
}
4849

@@ -459,6 +460,34 @@ class TestURLSession : LoopbackServerTest {
459460
task.resume()
460461
waitForExpectations(timeout: 20)
461462
}
463+
464+
func test_concurrentRequests() {
465+
let syncQ = dispatchQueueMake("test_dataTaskWithURL.syncQ")
466+
var dataTasks: [DataTask] = []
467+
let g = dispatchGroupMake()
468+
for f in 0..<640 {
469+
g.enter()
470+
let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Nepal"
471+
let expectation = self.expectation(description: "GET \(urlString) [\(f)]: with a delegate")
472+
globalDispatchQueue.async {
473+
let url = URL(string: urlString)!
474+
let d = DataTask(with: expectation)
475+
d.run(with: url)
476+
syncQ.async {
477+
dataTasks.append(d)
478+
g.leave()
479+
}
480+
}
481+
}
482+
waitForExpectations(timeout: 12)
483+
g.wait()
484+
for d in syncQ.sync(execute: {dataTasks}) {
485+
if !d.error {
486+
XCTAssertEqual(d.capital, "Kathmandu", "test_dataTaskWithURLRequest returned an unexpected result")
487+
}
488+
}
489+
}
490+
462491
}
463492

464493
class SharedDelegate: NSObject {
@@ -488,19 +517,73 @@ class SessionDelegate: NSObject, URLSessionDelegate {
488517
}
489518

490519
class DataTask : NSObject {
520+
let syncQ = dispatchQueueMake("org.swift.TestFoundation.TestURLSession.DataTask.syncQ")
491521
let dataTaskExpectation: XCTestExpectation!
492-
var capital = "unknown"
493-
var session: URLSession! = nil
494-
var task: URLSessionDataTask! = nil
495-
var cancelExpectation: XCTestExpectation?
496-
var responseReceivedExpectation: XCTestExpectation?
497-
var protocols: [AnyClass]?
522+
let protocols: [AnyClass]?
523+
524+
/* all the following var _XYZ need to be synchronized on syncQ.
525+
We can't just assert that we're on main thread here as we're modified in the URLSessionDataDelegate extension
526+
for DataTask
527+
*/
528+
var _capital = "unknown"
529+
var capital: String {
530+
get {
531+
return self.syncQ.sync { self._capital }
532+
}
533+
set {
534+
self.syncQ.sync { self._capital = newValue }
535+
}
536+
}
537+
var _session: URLSession! = nil
538+
var session: URLSession! {
539+
get {
540+
return self.syncQ.sync { self._session }
541+
}
542+
set {
543+
self.syncQ.sync { self._session = newValue }
544+
}
545+
}
546+
var _task: URLSessionDataTask! = nil
547+
var task: URLSessionDataTask! {
548+
get {
549+
return self.syncQ.sync { self._task }
550+
}
551+
set {
552+
self.syncQ.sync { self._task = newValue }
553+
}
554+
}
555+
var _cancelExpectation: XCTestExpectation?
556+
var cancelExpectation: XCTestExpectation? {
557+
get {
558+
return self.syncQ.sync { self._cancelExpectation }
559+
}
560+
set {
561+
self.syncQ.sync { self._cancelExpectation = newValue }
562+
}
563+
}
564+
var _responseReceivedExpectation: XCTestExpectation?
565+
var responseReceivedExpectation: XCTestExpectation? {
566+
get {
567+
return self.syncQ.sync { self._responseReceivedExpectation }
568+
}
569+
set {
570+
self.syncQ.sync { self._responseReceivedExpectation = newValue }
571+
}
572+
}
498573

499-
public var error = false
574+
private var _error = false
575+
public var error: Bool {
576+
get {
577+
return self.syncQ.sync { self._error }
578+
}
579+
set {
580+
self.syncQ.sync { self._error = newValue }
581+
}
582+
}
500583

501584
init(with expectation: XCTestExpectation, protocolClasses: [AnyClass]? = nil) {
502585
dataTaskExpectation = expectation
503-
protocols = protocolClasses
586+
protocols = protocolClasses
504587
}
505588

506589
func run(with request: URLRequest) {

0 commit comments

Comments
 (0)