Skip to content

Commit 8fa23cb

Browse files
committed
improve RepositoryManager::lookup
motivation: improved concurency control and cleaer code changes: * use a seperate queue to schedule the lookup tasks to ensure oncurrency control is done correctly * use Result::catching to help remove boilerplate error handling
1 parent 16e0e9c commit 8fa23cb

File tree

1 file changed

+78
-84
lines changed

1 file changed

+78
-84
lines changed

Sources/SourceControl/RepositoryManager.swift

Lines changed: 78 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ public class RepositoryManager {
3434
private let delegate: Delegate?
3535

3636
/// DispatchSemaphore to restrict concurrent operations on manager.
37-
private let lookupSemaphore: DispatchSemaphore
37+
private let concurrencySemaphore: DispatchSemaphore
38+
/// DispatchQueue to park pending lookups
39+
private let lookupQueue = DispatchQueue(label: "org.swift.swiftpm.repository-manager", attributes: .concurrent)
3840

3941
/// The filesystem to operate on.
4042
private let fileSystem: FileSystem
@@ -71,7 +73,7 @@ public class RepositoryManager {
7173
self.provider = provider
7274
self.delegate = delegate
7375

74-
self.lookupSemaphore = DispatchSemaphore(value: Swift.min(3, Concurrency.maxOperations))
76+
self.concurrencySemaphore = DispatchSemaphore(value: Swift.min(3, Concurrency.maxOperations))
7577
}
7678

7779
/// Get a handle to a repository.
@@ -98,24 +100,30 @@ public class RepositoryManager {
98100
callbackQueue: DispatchQueue,
99101
completion: @escaping (Result<RepositoryHandle, Error>) -> Void
100102
) {
101-
// wrap the callback in the requested queue
103+
// wrap the callback in the requested queue and cleanup operations
102104
let originalCompletion = completion
103105
let completion: (Result<RepositoryHandle, Error>) -> Void = { result in
104-
self.lookupSemaphore.signal()
106+
// free concurrency control semaphore
107+
self.concurrencySemaphore.signal()
108+
// remove any pending lookup
109+
self.pendingLookupsLock.lock()
110+
self.pendingLookups[repository]?.leave()
111+
self.pendingLookups[repository] = nil
112+
self.pendingLookupsLock.unlock()
113+
// call back on the request queue
105114
callbackQueue.async { originalCompletion(result) }
106115
}
107116

108-
self.lookupSemaphore.wait()
109117
let relativePath = repository.storagePath()
110118
let repositoryPath = self.path.appending(relativePath)
111-
let handle = RepositoryManager.RepositoryHandle(manager: self, repository: repository, subpath: relativePath)
119+
let handle = RepositoryHandle(manager: self, repository: repository, subpath: relativePath)
112120

113121
// check if there is a pending lookup
114122
self.pendingLookupsLock.lock()
115123
if let pendingLookup = self.pendingLookups[repository] {
116124
self.pendingLookupsLock.unlock()
117125
// chain onto the pending lookup
118-
return pendingLookup.notify(queue: callbackQueue) {
126+
return pendingLookup.notify(queue: self.lookupQueue) {
119127
// at this point the previous lookup should be complete and we can re-lookup
120128
self.lookup(
121129
package: package,
@@ -127,91 +135,77 @@ public class RepositoryManager {
127135
completion: originalCompletion
128136
)
129137
}
138+
} else {
139+
// record the pending lookup
140+
assert(self.pendingLookups[repository] == nil)
141+
let group = DispatchGroup()
142+
group.enter()
143+
self.pendingLookups[repository] = group
144+
self.pendingLookupsLock.unlock()
130145
}
131146

132-
// record the pending lookup
133-
assert(self.pendingLookups[repository] == nil)
134-
let group = DispatchGroup()
135-
group.enter()
136-
self.pendingLookups[repository] = group
137-
self.pendingLookupsLock.unlock()
138-
139-
// check if a repository already exists
140-
// errors when trying to check if a repository already exists are legitimate
141-
// and recoverable, and as such can be ignored
142-
if (try? self.provider.repositoryExists(at: repositoryPath)) ?? false {
143-
let result = Result<RepositoryHandle, Error>(catching: {
144-
// skip update if not needed
145-
if skipUpdate {
147+
// we must not block the calling thread (for concurrency control) so nesting this in a queue
148+
self.lookupQueue.async {
149+
// park the lookup thread based on the max concurrency allowed
150+
self.concurrencySemaphore.wait()
151+
152+
// check if a repository already exists
153+
// errors when trying to check if a repository already exists are legitimate
154+
// and recoverable, and as such can be ignored
155+
if (try? self.provider.repositoryExists(at: repositoryPath)) ?? false {
156+
// update if necessary and return early
157+
return completion(.init(catching: {
158+
// skip update if not needed
159+
if skipUpdate {
160+
return handle
161+
}
162+
// Update the repository when it is being looked up.
163+
let start = DispatchTime.now()
164+
delegateQueue.async {
165+
self.delegate?.willUpdate(package: package, repository: handle.repository)
166+
}
167+
let repository = try handle.open()
168+
try repository.fetch()
169+
let duration = start.distance(to: .now())
170+
delegateQueue.async {
171+
self.delegate?.didUpdate(package: package, repository: handle.repository, duration: duration)
172+
}
146173
return handle
147-
}
148-
// Update the repository when it is being looked up.
149-
let start = DispatchTime.now()
150-
delegateQueue.async {
151-
self.delegate?.willUpdate(package: package, repository: handle.repository)
152-
}
153-
let repository = try handle.open()
154-
try repository.fetch()
155-
let duration = start.distance(to: .now())
156-
delegateQueue.async {
157-
self.delegate?.didUpdate(package: package, repository: handle.repository, duration: duration)
158-
}
159-
return handle
160-
})
161-
162-
// remove the pending lookup
163-
self.pendingLookupsLock.lock()
164-
self.pendingLookups[repository]?.leave()
165-
self.pendingLookups[repository] = nil
166-
self.pendingLookupsLock.unlock()
167-
// and done
168-
return completion(result)
169-
}
174+
}))
175+
}
170176

171-
// perform the fetch
172-
// inform delegate that we are starting to fetch
173-
// calculate if cached (for delegate call) outside queue as it may change while queue is processing
174-
let isCached = self.cachePath.map{ self.fileSystem.exists($0.appending(handle.subpath)) } ?? false
175-
delegateQueue.async {
176-
let details = FetchDetails(fromCache: isCached, updatedCache: false)
177-
self.delegate?.willFetch(package: package, repository: handle.repository, details: details)
178-
}
177+
// inform delegate that we are starting to fetch
178+
// calculate if cached (for delegate call) outside queue as it may change while queue is processing
179+
let isCached = self.cachePath.map{ self.fileSystem.exists($0.appending(handle.subpath)) } ?? false
180+
delegateQueue.async {
181+
let details = FetchDetails(fromCache: isCached, updatedCache: false)
182+
self.delegate?.willFetch(package: package, repository: handle.repository, details: details)
183+
}
179184

180-
let start = DispatchTime.now()
181-
let lookupResult: Result<RepositoryHandle, Error>
182-
let delegateResult: Result<FetchDetails, Error>
185+
// perform the fetch
186+
let start = DispatchTime.now()
187+
let result = Result<FetchDetails, Error>(catching: {
188+
// make sure destination is free.
189+
try? self.fileSystem.removeFileTree(repositoryPath)
190+
// Fetch the repo.
191+
return try self.fetchAndPopulateCache(
192+
package: package,
193+
handle: handle,
194+
repositoryPath: repositoryPath,
195+
observabilityScope: observabilityScope,
196+
delegateQueue: delegateQueue
197+
)
198+
})
183199

184-
do {
185-
// make sure destination is free.
186-
try? self.fileSystem.removeFileTree(repositoryPath)
187-
// Fetch the repo.
188-
let details = try self.fetchAndPopulateCache(
189-
package: package,
190-
handle: handle,
191-
repositoryPath: repositoryPath,
192-
observabilityScope: observabilityScope,
193-
delegateQueue: delegateQueue
194-
)
195-
lookupResult = .success(handle)
196-
delegateResult = .success(details)
197-
} catch {
198-
lookupResult = .failure(error)
199-
delegateResult = .failure(error)
200-
}
200+
// Inform delegate fetch is done
201+
let duration = start.distance(to: .now())
202+
delegateQueue.async {
203+
self.delegate?.didFetch(package: package, repository: handle.repository, result: result, duration: duration)
204+
}
201205

202-
// Inform delegate.
203-
let duration = start.distance(to: .now())
204-
delegateQueue.async {
205-
self.delegate?.didFetch(package: package, repository: handle.repository, result: delegateResult, duration: duration)
206+
// and done
207+
completion(result.map{ _ in handle })
206208
}
207-
208-
// remove the pending lookup
209-
self.pendingLookupsLock.lock()
210-
self.pendingLookups[repository]?.leave()
211-
self.pendingLookups[repository] = nil
212-
self.pendingLookupsLock.unlock()
213-
// and done
214-
completion(lookupResult)
215209
}
216210

217211
/// Fetches the repository into the cache. If no `cachePath` is set or an error occurred fall back to fetching the repository without populating the cache.

0 commit comments

Comments
 (0)