Skip to content

improve repository manager lookup concurrency control #4177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 98 additions & 75 deletions Sources/SourceControl/RepositoryManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public class RepositoryManager {
private let delegate: Delegate?

/// DispatchSemaphore to restrict concurrent operations on manager.
private let lookupSemaphore: DispatchSemaphore
private let concurrencySemaphore: DispatchSemaphore
/// OperationQueue to park pending lookups
private let lookupQueue: OperationQueue

/// The filesystem to operate on.
private let fileSystem: FileSystem
Expand Down Expand Up @@ -71,7 +73,12 @@ public class RepositoryManager {
self.provider = provider
self.delegate = delegate

self.lookupSemaphore = DispatchSemaphore(value: Swift.min(3, Concurrency.maxOperations))
// this queue and semaphore is used to limit the amount of concurrent git operations taking place
let maxOperations = min(3, Concurrency.maxOperations)
self.lookupQueue = OperationQueue()
self.lookupQueue.name = "org.swift.swiftpm.repository-manager"
self.lookupQueue.maxConcurrentOperationCount = maxOperations
self.concurrencySemaphore = DispatchSemaphore(value: maxOperations)
}

/// Get a handle to a repository.
Expand All @@ -98,77 +105,103 @@ public class RepositoryManager {
callbackQueue: DispatchQueue,
completion: @escaping (Result<RepositoryHandle, Error>) -> Void
) {
// wrap the callback in the requested queue
let originalCompletion = completion
// wrap the callback in the requested queue and cleanup operations
let completion: (Result<RepositoryHandle, Error>) -> Void = { result in
self.lookupSemaphore.signal()
callbackQueue.async { originalCompletion(result) }
// free concurrency control semaphore
self.concurrencySemaphore.signal()
// remove any pending lookup
self.pendingLookupsLock.lock()
self.pendingLookups[repository]?.leave()
self.pendingLookups[repository] = nil
self.pendingLookupsLock.unlock()
// call back on the request queue
callbackQueue.async {
completion(result)
}
}

self.lookupSemaphore.wait()
let relativePath = repository.storagePath()
let repositoryPath = self.path.appending(relativePath)
let handle = RepositoryManager.RepositoryHandle(manager: self, repository: repository, subpath: relativePath)
// we must not block the calling thread (for concurrency control) so nesting this in a queue
self.lookupQueue.addOperation {
// park the lookup thread based on the max concurrency allowed
self.concurrencySemaphore.wait()

// check if there is a pending lookup
self.pendingLookupsLock.lock()
if let pendingLookup = self.pendingLookups[repository] {
self.pendingLookupsLock.unlock()
// chain onto the pending lookup
return pendingLookup.notify(queue: callbackQueue) {
// at this point the previous lookup should be complete and we can re-lookup
self.lookup(
// check if there is a pending lookup
self.pendingLookupsLock.lock()
if let pendingLookup = self.pendingLookups[repository] {
self.pendingLookupsLock.unlock()
// chain onto the pending lookup
return pendingLookup.notify(queue: .sharedConcurrent) {
// at this point the previous lookup should be complete and we can re-lookup
completion(.init(catching: {
try self.lookup(
package: package,
repository: repository,
skipUpdate: skipUpdate,
observabilityScope: observabilityScope,
delegateQueue: delegateQueue
)
}))
}
} else {
// record the pending lookup
assert(self.pendingLookups[repository] == nil)
let group = DispatchGroup()
group.enter()
self.pendingLookups[repository] = group
self.pendingLookupsLock.unlock()
}

completion(.init(catching: {
try self.lookup(
package: package,
repository: repository,
skipUpdate: skipUpdate,
observabilityScope: observabilityScope,
delegateQueue: delegateQueue,
callbackQueue: callbackQueue,
completion: originalCompletion
delegateQueue: delegateQueue
)
}
}))
}
}

// record the pending lookup
assert(self.pendingLookups[repository] == nil)
let group = DispatchGroup()
group.enter()
self.pendingLookups[repository] = group
self.pendingLookupsLock.unlock()
// sync version of the lookup,
// this is here because it simplifies reading & maintaining the logical flow
// while the underlying git client is sync
// once we move to an async git client we would need to get rid of this
// sync func and roll the logic into the async version above
private func lookup(
package: PackageIdentity,
repository: RepositorySpecifier,
skipUpdate: Bool,
observabilityScope: ObservabilityScope,
delegateQueue: DispatchQueue
) throws -> RepositoryHandle {
let relativePath = repository.storagePath()
let repositoryPath = self.path.appending(relativePath)
let handle = RepositoryHandle(manager: self, repository: repository, subpath: relativePath)

// check if a repository already exists
// errors when trying to check if a repository already exists are legitimate
// and recoverable, and as such can be ignored
if (try? self.provider.repositoryExists(at: repositoryPath)) ?? false {
let result = Result<RepositoryHandle, Error>(catching: {
// skip update if not needed
if skipUpdate {
return handle
}
// Update the repository when it is being looked up.
let start = DispatchTime.now()
delegateQueue.async {
self.delegate?.willUpdate(package: package, repository: handle.repository)
}
let repository = try handle.open()
try repository.fetch()
let duration = start.distance(to: .now())
delegateQueue.async {
self.delegate?.didUpdate(package: package, repository: handle.repository, duration: duration)
}
// update if necessary and return early
// skip update if not needed
if skipUpdate {
return handle
})

// remove the pending lookup
self.pendingLookupsLock.lock()
self.pendingLookups[repository]?.leave()
self.pendingLookups[repository] = nil
self.pendingLookupsLock.unlock()
// and done
return completion(result)
}
// Update the repository when it is being looked up.
let start = DispatchTime.now()
delegateQueue.async {
self.delegate?.willUpdate(package: package, repository: handle.repository)
}
let repository = try handle.open()
try repository.fetch()
let duration = start.distance(to: .now())
delegateQueue.async {
self.delegate?.didUpdate(package: package, repository: handle.repository, duration: duration)
}
return handle
}

// perform the fetch
// inform delegate that we are starting to fetch
// calculate if cached (for delegate call) outside queue as it may change while queue is processing
let isCached = self.cachePath.map{ self.fileSystem.exists($0.appending(handle.subpath)) } ?? false
Expand All @@ -177,41 +210,31 @@ public class RepositoryManager {
self.delegate?.willFetch(package: package, repository: handle.repository, details: details)
}

// perform the fetch
let start = DispatchTime.now()
let lookupResult: Result<RepositoryHandle, Error>
let delegateResult: Result<FetchDetails, Error>

do {
let fetchResult = Result<FetchDetails, Error>(catching: {
// make sure destination is free.
try? self.fileSystem.removeFileTree(repositoryPath)
// Fetch the repo.
let details = try self.fetchAndPopulateCache(
// fetch the repo and cache the results
return try self.fetchAndPopulateCache(
package: package,
handle: handle,
repositoryPath: repositoryPath,
observabilityScope: observabilityScope,
delegateQueue: delegateQueue
)
lookupResult = .success(handle)
delegateResult = .success(details)
} catch {
lookupResult = .failure(error)
delegateResult = .failure(error)
}
})

// Inform delegate.
// inform delegate fetch is done
let duration = start.distance(to: .now())
delegateQueue.async {
self.delegate?.didFetch(package: package, repository: handle.repository, result: delegateResult, duration: duration)
self.delegate?.didFetch(package: package, repository: handle.repository, result: fetchResult, duration: duration)
}

// remove the pending lookup
self.pendingLookupsLock.lock()
self.pendingLookups[repository]?.leave()
self.pendingLookups[repository] = nil
self.pendingLookupsLock.unlock()
// and done
completion(lookupResult)
// at this point we can throw, as we already notified the delegate above
_ = try fetchResult.get()

return handle
}

/// Fetches the repository into the cache. If no `cachePath` is set or an error occurred fall back to fetching the repository without populating the cache.
Expand Down
121 changes: 62 additions & 59 deletions Sources/Workspace/Workspace.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3509,71 +3509,74 @@ extension Workspace: PackageContainerProvider {
on queue: DispatchQueue,
completion: @escaping (Result<PackageContainer, Swift.Error>) -> Void
) {
queue.async {
do {
switch package.kind {
// If the container is local, just create and return a local package container.
case .root, .fileSystem:
let container = try FileSystemPackageContainer(
package: package,
identityResolver: self.identityResolver,
manifestLoader: self.manifestLoader,
toolsVersionLoader: self.toolsVersionLoader,
currentToolsVersion: self.currentToolsVersion,
fileSystem: self.fileSystem,
observabilityScope: observabilityScope
)
do {
switch package.kind {
// If the container is local, just create and return a local package container.
case .root, .fileSystem:
let container = try FileSystemPackageContainer(
package: package,
identityResolver: self.identityResolver,
manifestLoader: self.manifestLoader,
toolsVersionLoader: self.toolsVersionLoader,
currentToolsVersion: self.currentToolsVersion,
fileSystem: self.fileSystem,
observabilityScope: observabilityScope
)
queue.async {
completion(.success(container))
// Resolve the container using the repository manager.
case .localSourceControl, .remoteSourceControl:
let repositorySpecifier = try package.makeRepositorySpecifier()
self.repositoryManager.lookup(
package: package.identity,
repository: repositorySpecifier,
skipUpdate: skipUpdate,
observabilityScope: observabilityScope,
delegateQueue: queue,
callbackQueue: queue
) { result in
// Create the container wrapper.
let result = result.tryMap { handle -> PackageContainer in
// Open the repository.
//
// FIXME: Do we care about holding this open for the lifetime of the container.
let repository = try handle.open()
return try SourceControlPackageContainer(
package: package,
identityResolver: self.identityResolver,
repositorySpecifier: repositorySpecifier,
repository: repository,
manifestLoader: self.manifestLoader,
toolsVersionLoader: self.toolsVersionLoader,
currentToolsVersion: self.currentToolsVersion,
fingerprintStorage: self.fingerprints,
fingerprintCheckingMode: self.configuration.fingerprintCheckingMode,
observabilityScope: observabilityScope
)
}
completion(result)
}
// Resolve the container using the repository manager.
case .localSourceControl, .remoteSourceControl:
let repositorySpecifier = try package.makeRepositorySpecifier()
self.repositoryManager.lookup(
package: package.identity,
repository: repositorySpecifier,
skipUpdate: skipUpdate,
observabilityScope: observabilityScope,
delegateQueue: queue,
callbackQueue: queue
) { result in
dispatchPrecondition(condition: .onQueue(queue))
// Create the container wrapper.
let result = result.tryMap { handle -> PackageContainer in
// Open the repository.
//
// FIXME: Do we care about holding this open for the lifetime of the container.
let repository = try handle.open()
return try SourceControlPackageContainer(
package: package,
identityResolver: self.identityResolver,
repositorySpecifier: repositorySpecifier,
repository: repository,
manifestLoader: self.manifestLoader,
toolsVersionLoader: self.toolsVersionLoader,
currentToolsVersion: self.currentToolsVersion,
fingerprintStorage: self.fingerprints,
fingerprintCheckingMode: self.configuration.fingerprintCheckingMode,
observabilityScope: observabilityScope
)
}
// Resolve the container using the registry
case .registry:
let container = RegistryPackageContainer(
package: package,
identityResolver: self.identityResolver,
registryClient: self.registryClient,
manifestLoader: self.manifestLoader,
toolsVersionLoader: self.toolsVersionLoader,
currentToolsVersion: self.currentToolsVersion,
observabilityScope: observabilityScope
)
completion(.success(container))
completion(result)
}
} catch {
// Resolve the container using the registry
case .registry:
let container = RegistryPackageContainer(
package: package,
identityResolver: self.identityResolver,
registryClient: self.registryClient,
manifestLoader: self.manifestLoader,
toolsVersionLoader: self.toolsVersionLoader,
currentToolsVersion: self.currentToolsVersion,
observabilityScope: observabilityScope
)
queue.async {
completion(.failure(error))
completion(.success(container))
}
}
} catch {
queue.async {
completion(.failure(error))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class RegistryDownloadsManagerTests: XCTestCase {
}
}

if case .timedOut = group.wait(timeout: .now() + 10) {
if case .timedOut = group.wait(timeout: .now() + 60) {
return XCTFail("timeout")
}

Expand Down Expand Up @@ -342,7 +342,7 @@ class RegistryDownloadsManagerTests: XCTestCase {
}
}

if case .timedOut = group.wait(timeout: .now() + 10) {
if case .timedOut = group.wait(timeout: .now() + 60) {
return XCTFail("timeout")
}

Expand Down
Loading