Skip to content

Commit da23670

Browse files
authored
improve repository manager lookup concurrency (#4177)
motivation: improved concurrency control and cleaner code changes: * use a separate queue to schedule the lookup tasks to ensure concurrency control is done correctly * create a sync version of lookup to help make maintaining the code easier, at least while the git client is sync * reduce double queue overhead when Workspace::geContainer calls into RepositoryManager::lookup
1 parent a783689 commit da23670

File tree

4 files changed

+164
-138
lines changed

4 files changed

+164
-138
lines changed

Sources/SourceControl/RepositoryManager.swift

Lines changed: 98 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ public class RepositoryManager {
3434
private let delegate: Delegate?
3535

3636
/// DispatchSemaphore to restrict concurrent operations on manager.
37-
private let lookupSemaphore: DispatchSemaphore
37+
private let concurrencySemaphore: DispatchSemaphore
38+
/// OperationQueue to park pending lookups
39+
private let lookupQueue: OperationQueue
3840

3941
/// The filesystem to operate on.
4042
private let fileSystem: FileSystem
@@ -71,7 +73,12 @@ public class RepositoryManager {
7173
self.provider = provider
7274
self.delegate = delegate
7375

74-
self.lookupSemaphore = DispatchSemaphore(value: Swift.min(3, Concurrency.maxOperations))
76+
// this queue and semaphore is used to limit the amount of concurrent git operations taking place
77+
let maxOperations = min(3, Concurrency.maxOperations)
78+
self.lookupQueue = OperationQueue()
79+
self.lookupQueue.name = "org.swift.swiftpm.repository-manager"
80+
self.lookupQueue.maxConcurrentOperationCount = maxOperations
81+
self.concurrencySemaphore = DispatchSemaphore(value: maxOperations)
7582
}
7683

7784
/// Get a handle to a repository.
@@ -98,77 +105,103 @@ public class RepositoryManager {
98105
callbackQueue: DispatchQueue,
99106
completion: @escaping (Result<RepositoryHandle, Error>) -> Void
100107
) {
101-
// wrap the callback in the requested queue
102-
let originalCompletion = completion
108+
// wrap the callback in the requested queue and cleanup operations
103109
let completion: (Result<RepositoryHandle, Error>) -> Void = { result in
104-
self.lookupSemaphore.signal()
105-
callbackQueue.async { originalCompletion(result) }
110+
// free concurrency control semaphore
111+
self.concurrencySemaphore.signal()
112+
// remove any pending lookup
113+
self.pendingLookupsLock.lock()
114+
self.pendingLookups[repository]?.leave()
115+
self.pendingLookups[repository] = nil
116+
self.pendingLookupsLock.unlock()
117+
// call back on the request queue
118+
callbackQueue.async {
119+
completion(result)
120+
}
106121
}
107122

108-
self.lookupSemaphore.wait()
109-
let relativePath = repository.storagePath()
110-
let repositoryPath = self.path.appending(relativePath)
111-
let handle = RepositoryManager.RepositoryHandle(manager: self, repository: repository, subpath: relativePath)
123+
// we must not block the calling thread (for concurrency control) so nesting this in a queue
124+
self.lookupQueue.addOperation {
125+
// park the lookup thread based on the max concurrency allowed
126+
self.concurrencySemaphore.wait()
112127

113-
// check if there is a pending lookup
114-
self.pendingLookupsLock.lock()
115-
if let pendingLookup = self.pendingLookups[repository] {
116-
self.pendingLookupsLock.unlock()
117-
// chain onto the pending lookup
118-
return pendingLookup.notify(queue: callbackQueue) {
119-
// at this point the previous lookup should be complete and we can re-lookup
120-
self.lookup(
128+
// check if there is a pending lookup
129+
self.pendingLookupsLock.lock()
130+
if let pendingLookup = self.pendingLookups[repository] {
131+
self.pendingLookupsLock.unlock()
132+
// chain onto the pending lookup
133+
return pendingLookup.notify(queue: .sharedConcurrent) {
134+
// at this point the previous lookup should be complete and we can re-lookup
135+
completion(.init(catching: {
136+
try self.lookup(
137+
package: package,
138+
repository: repository,
139+
skipUpdate: skipUpdate,
140+
observabilityScope: observabilityScope,
141+
delegateQueue: delegateQueue
142+
)
143+
}))
144+
}
145+
} else {
146+
// record the pending lookup
147+
assert(self.pendingLookups[repository] == nil)
148+
let group = DispatchGroup()
149+
group.enter()
150+
self.pendingLookups[repository] = group
151+
self.pendingLookupsLock.unlock()
152+
}
153+
154+
completion(.init(catching: {
155+
try self.lookup(
121156
package: package,
122157
repository: repository,
123158
skipUpdate: skipUpdate,
124159
observabilityScope: observabilityScope,
125-
delegateQueue: delegateQueue,
126-
callbackQueue: callbackQueue,
127-
completion: originalCompletion
160+
delegateQueue: delegateQueue
128161
)
129-
}
162+
}))
130163
}
164+
}
131165

132-
// record the pending lookup
133-
assert(self.pendingLookups[repository] == nil)
134-
let group = DispatchGroup()
135-
group.enter()
136-
self.pendingLookups[repository] = group
137-
self.pendingLookupsLock.unlock()
166+
// sync version of the lookup,
167+
// this is here because it simplifies reading & maintaining the logical flow
168+
// while the underlying git client is sync
169+
// once we move to an async git client we would need to get rid of this
170+
// sync func and roll the logic into the async version above
171+
private func lookup(
172+
package: PackageIdentity,
173+
repository: RepositorySpecifier,
174+
skipUpdate: Bool,
175+
observabilityScope: ObservabilityScope,
176+
delegateQueue: DispatchQueue
177+
) throws -> RepositoryHandle {
178+
let relativePath = repository.storagePath()
179+
let repositoryPath = self.path.appending(relativePath)
180+
let handle = RepositoryHandle(manager: self, repository: repository, subpath: relativePath)
138181

139182
// check if a repository already exists
140183
// errors when trying to check if a repository already exists are legitimate
141184
// and recoverable, and as such can be ignored
142185
if (try? self.provider.repositoryExists(at: repositoryPath)) ?? false {
143-
let result = Result<RepositoryHandle, Error>(catching: {
144-
// skip update if not needed
145-
if skipUpdate {
146-
return handle
147-
}
148-
// Update the repository when it is being looked up.
149-
let start = DispatchTime.now()
150-
delegateQueue.async {
151-
self.delegate?.willUpdate(package: package, repository: handle.repository)
152-
}
153-
let repository = try handle.open()
154-
try repository.fetch()
155-
let duration = start.distance(to: .now())
156-
delegateQueue.async {
157-
self.delegate?.didUpdate(package: package, repository: handle.repository, duration: duration)
158-
}
186+
// update if necessary and return early
187+
// skip update if not needed
188+
if skipUpdate {
159189
return handle
160-
})
161-
162-
// remove the pending lookup
163-
self.pendingLookupsLock.lock()
164-
self.pendingLookups[repository]?.leave()
165-
self.pendingLookups[repository] = nil
166-
self.pendingLookupsLock.unlock()
167-
// and done
168-
return completion(result)
190+
}
191+
// Update the repository when it is being looked up.
192+
let start = DispatchTime.now()
193+
delegateQueue.async {
194+
self.delegate?.willUpdate(package: package, repository: handle.repository)
195+
}
196+
let repository = try handle.open()
197+
try repository.fetch()
198+
let duration = start.distance(to: .now())
199+
delegateQueue.async {
200+
self.delegate?.didUpdate(package: package, repository: handle.repository, duration: duration)
201+
}
202+
return handle
169203
}
170204

171-
// perform the fetch
172205
// inform delegate that we are starting to fetch
173206
// calculate if cached (for delegate call) outside queue as it may change while queue is processing
174207
let isCached = self.cachePath.map{ self.fileSystem.exists($0.appending(handle.subpath)) } ?? false
@@ -177,41 +210,31 @@ public class RepositoryManager {
177210
self.delegate?.willFetch(package: package, repository: handle.repository, details: details)
178211
}
179212

213+
// perform the fetch
180214
let start = DispatchTime.now()
181-
let lookupResult: Result<RepositoryHandle, Error>
182-
let delegateResult: Result<FetchDetails, Error>
183-
184-
do {
215+
let fetchResult = Result<FetchDetails, Error>(catching: {
185216
// make sure destination is free.
186217
try? self.fileSystem.removeFileTree(repositoryPath)
187-
// Fetch the repo.
188-
let details = try self.fetchAndPopulateCache(
218+
// fetch the repo and cache the results
219+
return try self.fetchAndPopulateCache(
189220
package: package,
190221
handle: handle,
191222
repositoryPath: repositoryPath,
192223
observabilityScope: observabilityScope,
193224
delegateQueue: delegateQueue
194225
)
195-
lookupResult = .success(handle)
196-
delegateResult = .success(details)
197-
} catch {
198-
lookupResult = .failure(error)
199-
delegateResult = .failure(error)
200-
}
226+
})
201227

202-
// Inform delegate.
228+
// inform delegate fetch is done
203229
let duration = start.distance(to: .now())
204230
delegateQueue.async {
205-
self.delegate?.didFetch(package: package, repository: handle.repository, result: delegateResult, duration: duration)
231+
self.delegate?.didFetch(package: package, repository: handle.repository, result: fetchResult, duration: duration)
206232
}
207233

208-
// remove the pending lookup
209-
self.pendingLookupsLock.lock()
210-
self.pendingLookups[repository]?.leave()
211-
self.pendingLookups[repository] = nil
212-
self.pendingLookupsLock.unlock()
213-
// and done
214-
completion(lookupResult)
234+
// at this point we can throw, as we already notified the delegate above
235+
_ = try fetchResult.get()
236+
237+
return handle
215238
}
216239

217240
/// Fetches the repository into the cache. If no `cachePath` is set or an error occurred fall back to fetching the repository without populating the cache.

Sources/Workspace/Workspace.swift

Lines changed: 62 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3509,71 +3509,74 @@ extension Workspace: PackageContainerProvider {
35093509
on queue: DispatchQueue,
35103510
completion: @escaping (Result<PackageContainer, Swift.Error>) -> Void
35113511
) {
3512-
queue.async {
3513-
do {
3514-
switch package.kind {
3515-
// If the container is local, just create and return a local package container.
3516-
case .root, .fileSystem:
3517-
let container = try FileSystemPackageContainer(
3518-
package: package,
3519-
identityResolver: self.identityResolver,
3520-
manifestLoader: self.manifestLoader,
3521-
toolsVersionLoader: self.toolsVersionLoader,
3522-
currentToolsVersion: self.currentToolsVersion,
3523-
fileSystem: self.fileSystem,
3524-
observabilityScope: observabilityScope
3525-
)
3512+
do {
3513+
switch package.kind {
3514+
// If the container is local, just create and return a local package container.
3515+
case .root, .fileSystem:
3516+
let container = try FileSystemPackageContainer(
3517+
package: package,
3518+
identityResolver: self.identityResolver,
3519+
manifestLoader: self.manifestLoader,
3520+
toolsVersionLoader: self.toolsVersionLoader,
3521+
currentToolsVersion: self.currentToolsVersion,
3522+
fileSystem: self.fileSystem,
3523+
observabilityScope: observabilityScope
3524+
)
3525+
queue.async {
35263526
completion(.success(container))
3527-
// Resolve the container using the repository manager.
3528-
case .localSourceControl, .remoteSourceControl:
3529-
let repositorySpecifier = try package.makeRepositorySpecifier()
3530-
self.repositoryManager.lookup(
3531-
package: package.identity,
3532-
repository: repositorySpecifier,
3533-
skipUpdate: skipUpdate,
3534-
observabilityScope: observabilityScope,
3535-
delegateQueue: queue,
3536-
callbackQueue: queue
3537-
) { result in
3538-
// Create the container wrapper.
3539-
let result = result.tryMap { handle -> PackageContainer in
3540-
// Open the repository.
3541-
//
3542-
// FIXME: Do we care about holding this open for the lifetime of the container.
3543-
let repository = try handle.open()
3544-
return try SourceControlPackageContainer(
3545-
package: package,
3546-
identityResolver: self.identityResolver,
3547-
repositorySpecifier: repositorySpecifier,
3548-
repository: repository,
3549-
manifestLoader: self.manifestLoader,
3550-
toolsVersionLoader: self.toolsVersionLoader,
3551-
currentToolsVersion: self.currentToolsVersion,
3552-
fingerprintStorage: self.fingerprints,
3553-
fingerprintCheckingMode: self.configuration.fingerprintCheckingMode,
3554-
observabilityScope: observabilityScope
3555-
)
3556-
}
3557-
completion(result)
3527+
}
3528+
// Resolve the container using the repository manager.
3529+
case .localSourceControl, .remoteSourceControl:
3530+
let repositorySpecifier = try package.makeRepositorySpecifier()
3531+
self.repositoryManager.lookup(
3532+
package: package.identity,
3533+
repository: repositorySpecifier,
3534+
skipUpdate: skipUpdate,
3535+
observabilityScope: observabilityScope,
3536+
delegateQueue: queue,
3537+
callbackQueue: queue
3538+
) { result in
3539+
dispatchPrecondition(condition: .onQueue(queue))
3540+
// Create the container wrapper.
3541+
let result = result.tryMap { handle -> PackageContainer in
3542+
// Open the repository.
3543+
//
3544+
// FIXME: Do we care about holding this open for the lifetime of the container.
3545+
let repository = try handle.open()
3546+
return try SourceControlPackageContainer(
3547+
package: package,
3548+
identityResolver: self.identityResolver,
3549+
repositorySpecifier: repositorySpecifier,
3550+
repository: repository,
3551+
manifestLoader: self.manifestLoader,
3552+
toolsVersionLoader: self.toolsVersionLoader,
3553+
currentToolsVersion: self.currentToolsVersion,
3554+
fingerprintStorage: self.fingerprints,
3555+
fingerprintCheckingMode: self.configuration.fingerprintCheckingMode,
3556+
observabilityScope: observabilityScope
3557+
)
35583558
}
3559-
// Resolve the container using the registry
3560-
case .registry:
3561-
let container = RegistryPackageContainer(
3562-
package: package,
3563-
identityResolver: self.identityResolver,
3564-
registryClient: self.registryClient,
3565-
manifestLoader: self.manifestLoader,
3566-
toolsVersionLoader: self.toolsVersionLoader,
3567-
currentToolsVersion: self.currentToolsVersion,
3568-
observabilityScope: observabilityScope
3569-
)
3570-
completion(.success(container))
3559+
completion(result)
35713560
}
3572-
} catch {
3561+
// Resolve the container using the registry
3562+
case .registry:
3563+
let container = RegistryPackageContainer(
3564+
package: package,
3565+
identityResolver: self.identityResolver,
3566+
registryClient: self.registryClient,
3567+
manifestLoader: self.manifestLoader,
3568+
toolsVersionLoader: self.toolsVersionLoader,
3569+
currentToolsVersion: self.currentToolsVersion,
3570+
observabilityScope: observabilityScope
3571+
)
35733572
queue.async {
3574-
completion(.failure(error))
3573+
completion(.success(container))
35753574
}
35763575
}
3576+
} catch {
3577+
queue.async {
3578+
completion(.failure(error))
3579+
}
35773580
}
35783581
}
35793582

Tests/PackageRegistryTests/RegistryDownloadsManagerTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ class RegistryDownloadsManagerTests: XCTestCase {
298298
}
299299
}
300300

301-
if case .timedOut = group.wait(timeout: .now() + 10) {
301+
if case .timedOut = group.wait(timeout: .now() + 60) {
302302
return XCTFail("timeout")
303303
}
304304

@@ -342,7 +342,7 @@ class RegistryDownloadsManagerTests: XCTestCase {
342342
}
343343
}
344344

345-
if case .timedOut = group.wait(timeout: .now() + 10) {
345+
if case .timedOut = group.wait(timeout: .now() + 60) {
346346
return XCTFail("timeout")
347347
}
348348

0 commit comments

Comments
 (0)