Skip to content

Commit f54f124

Browse files
committed
http client to support configurable max concurrent requests (#4181)
motivation: controlling how many concurrent requests are taking place is important for controlling bandwidth requirements, especially as we are transitioning to registry based dependencies changes: * add max concurrency controls to http client using a dedicated queue * remove workspace binary dependencies concurrency control since its not longer necessary (applied at http client level) * fix a bug in URLSeesion based HTTP client where download can be removed between delegate calls * improve several http client tests, making them thread safe and robust * add tests
1 parent dcd72bf commit f54f124

File tree

7 files changed

+284
-106
lines changed

7 files changed

+284
-106
lines changed

Sources/Basics/ConcurrencyHelpers.swift

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import Dispatch
1212
import class Foundation.ProcessInfo
1313
import TSCBasic
1414

15+
public typealias Lock = TSCBasic.Lock
16+
1517
/// Thread-safe dictionary like structure
1618
public final class ThreadSafeKeyValueStore<Key, Value> where Key: Hashable {
1719
private var underlying: [Key: Value]
@@ -194,6 +196,12 @@ public final class ThreadSafeBox<Value> {
194196
}
195197
}
196198

199+
public func get(`default`: Value) -> Value {
200+
self.lock.withLock {
201+
self.underlying ?? `default`
202+
}
203+
}
204+
197205
public func put(_ newValue: Value) {
198206
self.lock.withLock {
199207
self.underlying = newValue
@@ -222,6 +230,23 @@ public final class ThreadSafeBox<Value> {
222230
}
223231
}
224232

233+
extension ThreadSafeBox where Value == Int {
234+
public func increment() {
235+
self.lock.withLock {
236+
if let value = self.underlying {
237+
self.underlying = value + 1
238+
}
239+
}
240+
}
241+
public func decrement() {
242+
self.lock.withLock {
243+
if let value = self.underlying {
244+
self.underlying = value - 1
245+
}
246+
}
247+
}
248+
}
249+
225250
public enum Concurrency {
226251
public static var maxOperations: Int {
227252
return ProcessEnv.vars["SWIFTPM_MAX_CONCURRENT_OPERATIONS"].flatMap(Int.init) ?? ProcessInfo.processInfo.activeProcessorCount

Sources/Basics/Errors.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
See http://swift.org/CONTRIBUTORS.txt for Swift project authors
99
*/
1010

11+
import struct TSCBasic.StringError
12+
13+
public typealias StringError = TSCBasic.StringError
14+
1115
public struct InternalError: Error {
1216
private let description: String
1317
public init(_ description: String) {

Sources/Basics/HTPClient+URLSession.swift

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,11 @@ private class DownloadTaskManager: NSObject, URLSessionDownloadDelegate {
149149
return
150150
}
151151

152-
task.location = location
152+
do {
153+
try task.fileSystem.move(from: AbsolutePath(location.path), to: task.destination)
154+
} catch {
155+
task.moveFileError = error
156+
}
153157
}
154158

155159
public func urlSession(_ session: URLSession, task downloadTask: URLSessionTask, didCompleteWithError error: Error?) {
@@ -160,10 +164,9 @@ private class DownloadTaskManager: NSObject, URLSessionDownloadDelegate {
160164
do {
161165
if let error = error {
162166
throw HTTPClientError.downloadError("\(error)")
167+
} else if let error = task.moveFileError {
168+
throw error
163169
} else if let response = downloadTask.response as? HTTPURLResponse {
164-
if let location = task.location {
165-
try task.fileSystem.move(from: AbsolutePath(location.path), to: task.destination)
166-
}
167170
task.completionHandler(.success(response.response(body: nil)))
168171
} else {
169172
throw HTTPClientError.invalidResponse
@@ -180,7 +183,7 @@ private class DownloadTaskManager: NSObject, URLSessionDownloadDelegate {
180183
let completionHandler: HTTPClient.CompletionHandler
181184
let progressHandler: HTTPClient.ProgressHandler?
182185

183-
var location: URL?
186+
var moveFileError: Error?
184187

185188
init(task: URLSessionDownloadTask, fileSystem: FileSystem, destination: AbsolutePath, progressHandler: HTTPClient.ProgressHandler?, completionHandler: @escaping HTTPClient.CompletionHandler) {
186189
self.task = task

Sources/Basics/HTTPClient.swift

Lines changed: 71 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import struct Foundation.Data
1313
import struct Foundation.Date
1414
import class Foundation.JSONDecoder
1515
import class Foundation.NSError
16+
import class Foundation.OperationQueue
1617
import struct Foundation.URL
1718
import TSCBasic
1819

@@ -45,6 +46,11 @@ public struct HTTPClient {
4546
public var configuration: HTTPClientConfiguration
4647
private let underlying: Handler
4748

49+
/// DispatchSemaphore to restrict concurrent operations on manager.
50+
private let concurrencySemaphore: DispatchSemaphore
51+
/// OperationQueue to park pending requests
52+
private let requestsQueue: OperationQueue
53+
4854
// static to share across instances of the http client
4955
private static var hostsErrorsLock = Lock()
5056
private static var hostsErrors = [String: [Date]]()
@@ -53,6 +59,14 @@ public struct HTTPClient {
5359
self.configuration = configuration
5460
// FIXME: inject platform specific implementation here
5561
self.underlying = handler ?? URLSessionHTTPClient().execute
62+
63+
// this queue and semaphore is used to limit the amount of concurrent http requests taking place
64+
// the default max number of request chosen to match Concurrency.maxOperations which is the number of active CPUs
65+
let maxConcurrentRequests = configuration.maxConcurrentRequests ?? Concurrency.maxOperations
66+
self.requestsQueue = OperationQueue()
67+
self.requestsQueue.name = "org.swift.swiftpm.http-client"
68+
self.requestsQueue.maxConcurrentOperationCount = maxConcurrentRequests
69+
self.concurrencySemaphore = DispatchSemaphore(value: maxConcurrentRequests)
5670
}
5771

5872
/// Execute an HTTP request asynchronously
@@ -100,12 +114,14 @@ public struct HTTPClient {
100114
observabilityScope: observabilityScope,
101115
progress: progress.map { handler in
102116
{ received, expected in
117+
// call back on the requested queue
103118
callbackQueue.async {
104119
handler(received, expected)
105120
}
106121
}
107122
},
108123
completion: { result in
124+
// call back on the requested queue
109125
callbackQueue.async {
110126
completion(result)
111127
}
@@ -114,45 +130,65 @@ public struct HTTPClient {
114130
}
115131

116132
private func _execute(request: Request, requestNumber: Int, observabilityScope: ObservabilityScope?, progress: ProgressHandler?, completion: @escaping CompletionHandler) {
117-
if self.shouldCircuitBreak(request: request) {
118-
observabilityScope?.emit(warning: "Circuit breaker triggered for \(request.url)")
119-
return completion(.failure(HTTPClientError.circuitBreakerTriggered))
133+
// wrap completion handler with concurrency control cleanup
134+
let originalCompletion = completion
135+
let completion: CompletionHandler = { result in
136+
// free concurrency control semaphore
137+
self.concurrencySemaphore.signal()
138+
originalCompletion(result)
120139
}
121140

122-
self.underlying(
123-
request,
124-
{ received, expected in
125-
if let max = request.options.maximumResponseSizeInBytes {
126-
guard received < max else {
127-
// FIXME: cancel the request?
128-
return completion(.failure(HTTPClientError.responseTooLarge(received)))
129-
}
130-
}
131-
progress?(received, expected)
132-
},
133-
{ result in
134-
switch result {
135-
case .failure(let error):
136-
completion(.failure(error))
137-
case .success(let response):
138-
// record host errors for circuit breaker
139-
self.recordErrorIfNecessary(response: response, request: request)
140-
// handle retry strategy
141-
if let retryDelay = self.shouldRetry(response: response, request: request, requestNumber: requestNumber) {
142-
observabilityScope?.emit(warning: "\(request.url) failed, retrying in \(retryDelay)")
143-
// TODO: dedicated retry queue?
144-
return self.configuration.callbackQueue.asyncAfter(deadline: .now() + retryDelay) {
145-
self._execute(request: request, requestNumber: requestNumber + 1, observabilityScope: observabilityScope, progress: progress, completion: completion)
141+
// we must not block the calling thread (for concurrency control) so nesting this in a queue
142+
self.requestsQueue.addOperation {
143+
// park the request thread based on the max concurrency allowed
144+
self.concurrencySemaphore.wait()
145+
146+
// apply circuit breaker if necessary
147+
if self.shouldCircuitBreak(request: request) {
148+
observabilityScope?.emit(warning: "Circuit breaker triggered for \(request.url)")
149+
return completion(.failure(HTTPClientError.circuitBreakerTriggered))
150+
}
151+
152+
// call underlying handler
153+
self.underlying(
154+
request,
155+
{ received, expected in
156+
if let max = request.options.maximumResponseSizeInBytes {
157+
guard received < max else {
158+
// FIXME: cancel the request?
159+
return completion(.failure(HTTPClientError.responseTooLarge(received)))
146160
}
147161
}
148-
// check for valid response codes
149-
if let validResponseCodes = request.options.validResponseCodes, !validResponseCodes.contains(response.statusCode) {
150-
return completion(.failure(HTTPClientError.badResponseStatusCode(response.statusCode)))
162+
progress?(received, expected)
163+
},
164+
{ result in
165+
// handle result
166+
switch result {
167+
case .failure(let error):
168+
completion(.failure(error))
169+
case .success(let response):
170+
// record host errors for circuit breaker
171+
self.recordErrorIfNecessary(response: response, request: request)
172+
// handle retry strategy
173+
if let retryDelay = self.shouldRetry(response: response, request: request, requestNumber: requestNumber) {
174+
observabilityScope?.emit(warning: "\(request.url) failed, retrying in \(retryDelay)")
175+
// free concurrency control semaphore, since we re-submitting the request with the original completion handler
176+
// using the wrapped completion handler may lead to starving the mac concurrent requests
177+
self.concurrencySemaphore.signal()
178+
// TODO: dedicated retry queue?
179+
return self.configuration.callbackQueue.asyncAfter(deadline: .now() + retryDelay) {
180+
self._execute(request: request, requestNumber: requestNumber + 1, observabilityScope: observabilityScope, progress: progress, completion: originalCompletion)
181+
}
182+
}
183+
// check for valid response codes
184+
if let validResponseCodes = request.options.validResponseCodes, !validResponseCodes.contains(response.statusCode) {
185+
return completion(.failure(HTTPClientError.badResponseStatusCode(response.statusCode)))
186+
}
187+
completion(.success(response))
151188
}
152-
completion(.success(response))
153189
}
154-
}
155-
)
190+
)
191+
}
156192
}
157193

158194
private func shouldRetry(response: Response, request: Request, requestNumber: Int) -> DispatchTimeInterval? {
@@ -245,6 +281,7 @@ public struct HTTPClientConfiguration {
245281
public var authorizationProvider: HTTPClientAuthorizationProvider?
246282
public var retryStrategy: HTTPClientRetryStrategy?
247283
public var circuitBreakerStrategy: HTTPClientCircuitBreakerStrategy?
284+
public var maxConcurrentRequests: Int?
248285
public var callbackQueue: DispatchQueue
249286

250287
public init() {
@@ -253,6 +290,7 @@ public struct HTTPClientConfiguration {
253290
self.authorizationProvider = .none
254291
self.retryStrategy = .none
255292
self.circuitBreakerStrategy = .none
293+
self.maxConcurrentRequests = .none
256294
self.callbackQueue = .sharedConcurrent
257295
}
258296
}

Sources/Workspace/Workspace.swift

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2515,9 +2515,6 @@ extension Workspace {
25152515
}
25162516
}
25172517

2518-
// download max n files concurrently
2519-
let semaphore = DispatchSemaphore(value: Concurrency.maxOperations)
2520-
25212518
// finally download zip files, if any
25222519
for artifact in zipArtifacts.get() {
25232520
let parentDirectory = self.location.artifactsDirectory.appending(component: artifact.packageRef.identity.description)
@@ -2532,7 +2529,6 @@ extension Workspace {
25322529
}
25332530
}
25342531

2535-
semaphore.wait()
25362532
group.enter()
25372533
var headers = HTTPClientHeaders()
25382534
headers.add(name: "Accept", value: "application/octet-stream")
@@ -2553,10 +2549,7 @@ extension Workspace {
25532549
totalBytesToDownload: totalBytesToDownload)
25542550
},
25552551
completion: { downloadResult in
2556-
defer {
2557-
group.leave()
2558-
semaphore.signal()
2559-
}
2552+
defer { group.leave() }
25602553

25612554
// TODO: Use the same extraction logic for both remote and local archived artifacts.
25622555
switch downloadResult {

0 commit comments

Comments
 (0)