Skip to content

improve threading correctness in URLSession #1186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Foundation/URLSession/URLSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,13 @@ extension URLSession {
}
}

fileprivate let globalVarSyncQ = DispatchQueue(label: "org.swift.Foundation.URLSession.GlobalVarSyncQ")
fileprivate var sessionCounter = Int32(0)
fileprivate func nextSessionIdentifier() -> Int32 {
//TODO: find an alternative for OSAtomicIncrement32Barrier() on Linux
sessionCounter += 1
return sessionCounter
return globalVarSyncQ.sync {
sessionCounter += 1
return sessionCounter
}
}
public let NSURLSessionTransferSizeUnknown: Int64 = -1

Expand Down
80 changes: 24 additions & 56 deletions Foundation/URLSession/URLSessionTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ open class URLSessionTask : NSObject, NSCopying {
internal var session: URLSessionProtocol! //change to nil when task completes
internal let body: _Body
fileprivate var _protocol: URLProtocol! = nil
private let syncQ = DispatchQueue(label: "org.swift.URLSessionTask.SyncQ")

/// All operations must run on this queue.
internal let workQueue: DispatchQueue
/// Using dispatch semaphore to make public attributes thread safe.
/// A semaphore is a simpler option against the usage of concurrent queue
/// as the critical sections are very short.
fileprivate let semaphore = DispatchSemaphore(value: 1)

public override init() {
// Darwin Foundation oddly allows calling this initializer, even though
Expand All @@ -66,7 +63,8 @@ open class URLSessionTask : NSObject, NSCopying {
}
internal init(session: URLSession, request: URLRequest, taskIdentifier: Int, body: _Body) {
self.session = session
self.workQueue = session.workQueue
/* make sure we're actually having a serial queue as it's used for synchronization */
self.workQueue = DispatchQueue.init(label: "org.swift.URLSessionTask.WorkQueue", target: session.workQueue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. However, I recall we had replaced the use of DispatchQueues with semaphores in attempts to work around a weird and intermittent hang in DispatchQueue.async - something we could reproduce only in a couple of local environments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pushkarnk well, if that's the case I want to know and I'll fix the issue.

self.taskIdentifier = taskIdentifier
self.originalRequest = request
self.body = body
Expand Down Expand Up @@ -112,31 +110,19 @@ open class URLSessionTask : NSObject, NSCopying {
/// May differ from originalRequest due to http server redirection
/*@NSCopying*/ open internal(set) var currentRequest: URLRequest? {
get {
semaphore.wait()
defer {
semaphore.signal()
}
return self._currentRequest
return self.syncQ.sync { return self._currentRequest }
}
set {
semaphore.wait()
self._currentRequest = newValue
semaphore.signal()
self.syncQ.sync { self._currentRequest = newValue }
}
}
fileprivate var _currentRequest: URLRequest? = nil
/*@NSCopying*/ open internal(set) var response: URLResponse? {
get {
semaphore.wait()
defer {
semaphore.signal()
}
return self._response
return self.syncQ.sync { return self._response }
}
set {
semaphore.wait()
self._response = newValue
semaphore.signal()
self.syncQ.sync { self._response = newValue }
}
}
fileprivate var _response: URLResponse? = nil
Expand All @@ -149,33 +135,21 @@ open class URLSessionTask : NSObject, NSCopying {
/// Number of body bytes already received
open internal(set) var countOfBytesReceived: Int64 {
get {
semaphore.wait()
defer {
semaphore.signal()
}
return self._countOfBytesReceived
return self.syncQ.sync { return self._countOfBytesReceived }
}
set {
semaphore.wait()
self._countOfBytesReceived = newValue
semaphore.signal()
self.syncQ.sync { self._countOfBytesReceived = newValue }
}
}
fileprivate var _countOfBytesReceived: Int64 = 0

/// Number of body bytes already sent */
open internal(set) var countOfBytesSent: Int64 {
get {
semaphore.wait()
defer {
semaphore.signal()
}
return self._countOfBytesSent
return self.syncQ.sync { return self._countOfBytesSent }
}
set {
semaphore.wait()
self._countOfBytesSent = newValue
semaphore.signal()
self.syncQ.sync { self._countOfBytesSent = newValue }
}
}

Expand Down Expand Up @@ -215,16 +189,10 @@ open class URLSessionTask : NSObject, NSCopying {
*/
open var state: URLSessionTask.State {
get {
semaphore.wait()
defer {
semaphore.signal()
}
return self._state
return self.syncQ.sync { self._state }
}
set {
semaphore.wait()
self._state = newValue
semaphore.signal()
self.syncQ.sync { self._state = newValue }
}
}
fileprivate var _state: URLSessionTask.State = .suspended
Expand Down Expand Up @@ -305,16 +273,10 @@ open class URLSessionTask : NSObject, NSCopying {
/// URLSessionTask.highPriority, but use is not restricted to these.
open var priority: Float {
get {
semaphore.wait()
defer {
semaphore.signal()
}
return self._priority
return self.workQueue.sync { return self._priority }
}
set {
semaphore.wait()
self._priority = newValue
semaphore.signal()
self.workQueue.sync { self._priority = newValue }
}
}
fileprivate var _priority: Float = URLSessionTask.defaultPriority
Expand Down Expand Up @@ -565,7 +527,9 @@ extension _ProtocolClient : URLProtocolClient {
session.delegateQueue.addOperation {
delegate.urlSession(session, task: task, didCompleteWithError: nil)
task.state = .completed
session.taskRegistry.remove(task)
task.workQueue.async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will fix an extremely intermittent crash a few of us have seen.

session.taskRegistry.remove(task)
}
}
case .noDelegate:
task.state = .completed
Expand Down Expand Up @@ -616,7 +580,9 @@ extension _ProtocolClient : URLProtocolClient {
session.delegateQueue.addOperation {
delegate.urlSession(session, task: task, didCompleteWithError: error as Error)
task.state = .completed
session.taskRegistry.remove(task)
task.workQueue.async {
session.taskRegistry.remove(task)
}
}
case .noDelegate:
task.state = .completed
Expand All @@ -625,7 +591,9 @@ extension _ProtocolClient : URLProtocolClient {
session.delegateQueue.addOperation {
completion(nil, nil, error)
task.state = .completed
session.taskRegistry.remove(task)
task.workQueue.async {
session.taskRegistry.remove(task)
}
}
case .downloadCompletionHandler(let completion):
session.delegateQueue.addOperation {
Expand Down
2 changes: 2 additions & 0 deletions TestFoundation/HTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import Dispatch
#endif

public let globalDispatchQueue = DispatchQueue.global()
public let dispatchQueueMake: (String) -> DispatchQueue = { DispatchQueue.init(label: $0) }
public let dispatchGroupMake: () -> DispatchGroup = DispatchGroup.init

struct _HTTPUtils {
static let CRLF = "\r\n"
Expand Down
99 changes: 91 additions & 8 deletions TestFoundation/TestURLSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class TestURLSession : LoopbackServerTest {
("test_illegalHTTPServerResponses", test_illegalHTTPServerResponses),
("test_dataTaskWithSharedDelegate", test_dataTaskWithSharedDelegate),
("test_simpleUploadWithDelegate", test_simpleUploadWithDelegate),
("test_concurrentRequests", test_concurrentRequests),
]
}

Expand Down Expand Up @@ -459,6 +460,34 @@ class TestURLSession : LoopbackServerTest {
task.resume()
waitForExpectations(timeout: 20)
}

func test_concurrentRequests() {
let syncQ = dispatchQueueMake("test_dataTaskWithURL.syncQ")
var dataTasks: [DataTask] = []
let g = dispatchGroupMake()
for f in 0..<640 {
g.enter()
let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Nepal"
let expectation = self.expectation(description: "GET \(urlString) [\(f)]: with a delegate")
globalDispatchQueue.async {
let url = URL(string: urlString)!
let d = DataTask(with: expectation)
d.run(with: url)
syncQ.async {
dataTasks.append(d)
g.leave()
}
}
}
waitForExpectations(timeout: 12)
g.wait()
for d in syncQ.sync(execute: {dataTasks}) {
if !d.error {
XCTAssertEqual(d.capital, "Kathmandu", "test_dataTaskWithURLRequest returned an unexpected result")
}
}
}

}

class SharedDelegate: NSObject {
Expand Down Expand Up @@ -488,19 +517,73 @@ class SessionDelegate: NSObject, URLSessionDelegate {
}

class DataTask : NSObject {
let syncQ = dispatchQueueMake("org.swift.TestFoundation.TestURLSession.DataTask.syncQ")
let dataTaskExpectation: XCTestExpectation!
var capital = "unknown"
var session: URLSession! = nil
var task: URLSessionDataTask! = nil
var cancelExpectation: XCTestExpectation?
var responseReceivedExpectation: XCTestExpectation?
var protocols: [AnyClass]?
let protocols: [AnyClass]?

/* all the following var _XYZ need to be synchronized on syncQ.
We can't just assert that we're on main thread here as we're modified in the URLSessionDataDelegate extension
for DataTask
*/
var _capital = "unknown"
var capital: String {
get {
return self.syncQ.sync { self._capital }
}
set {
self.syncQ.sync { self._capital = newValue }
}
}
var _session: URLSession! = nil
var session: URLSession! {
get {
return self.syncQ.sync { self._session }
}
set {
self.syncQ.sync { self._session = newValue }
}
}
var _task: URLSessionDataTask! = nil
var task: URLSessionDataTask! {
get {
return self.syncQ.sync { self._task }
}
set {
self.syncQ.sync { self._task = newValue }
}
}
var _cancelExpectation: XCTestExpectation?
var cancelExpectation: XCTestExpectation? {
get {
return self.syncQ.sync { self._cancelExpectation }
}
set {
self.syncQ.sync { self._cancelExpectation = newValue }
}
}
var _responseReceivedExpectation: XCTestExpectation?
var responseReceivedExpectation: XCTestExpectation? {
get {
return self.syncQ.sync { self._responseReceivedExpectation }
}
set {
self.syncQ.sync { self._responseReceivedExpectation = newValue }
}
}

public var error = false
private var _error = false
public var error: Bool {
get {
return self.syncQ.sync { self._error }
}
set {
self.syncQ.sync { self._error = newValue }
}
}

init(with expectation: XCTestExpectation, protocolClasses: [AnyClass]? = nil) {
dataTaskExpectation = expectation
protocols = protocolClasses
protocols = protocolClasses
}

func run(with request: URLRequest) {
Expand Down