@@ -34,7 +34,9 @@ public class RepositoryManager {
34
34
private let delegate : Delegate ?
35
35
36
36
/// DispatchSemaphore to restrict concurrent operations on manager.
37
- private let lookupSemaphore : DispatchSemaphore
37
+ private let concurrencySemaphore : DispatchSemaphore
38
+ /// OperationQueue to park pending lookups
39
+ private let lookupQueue : OperationQueue
38
40
39
41
/// The filesystem to operate on.
40
42
private let fileSystem : FileSystem
@@ -71,7 +73,12 @@ public class RepositoryManager {
71
73
self . provider = provider
72
74
self . delegate = delegate
73
75
74
- self . lookupSemaphore = DispatchSemaphore ( value: Swift . min ( 3 , Concurrency . maxOperations) )
76
+ // this queue and semaphore is used to limit the amount of concurrent git operations taking place
77
+ let maxOperations = min ( 3 , Concurrency . maxOperations)
78
+ self . lookupQueue = OperationQueue ( )
79
+ self . lookupQueue. name = " org.swift.swiftpm.repository-manager "
80
+ self . lookupQueue. maxConcurrentOperationCount = maxOperations
81
+ self . concurrencySemaphore = DispatchSemaphore ( value: maxOperations)
75
82
}
76
83
77
84
/// Get a handle to a repository.
@@ -98,77 +105,103 @@ public class RepositoryManager {
98
105
callbackQueue: DispatchQueue ,
99
106
completion: @escaping ( Result < RepositoryHandle , Error > ) -> Void
100
107
) {
101
- // wrap the callback in the requested queue
102
- let originalCompletion = completion
108
+ // wrap the callback in the requested queue and cleanup operations
103
109
let completion : ( Result < RepositoryHandle , Error > ) -> Void = { result in
104
- self . lookupSemaphore. signal ( )
105
- callbackQueue. async { originalCompletion ( result) }
110
+ // free concurrency control semaphore
111
+ self . concurrencySemaphore. signal ( )
112
+ // remove any pending lookup
113
+ self . pendingLookupsLock. lock ( )
114
+ self . pendingLookups [ repository] ? . leave ( )
115
+ self . pendingLookups [ repository] = nil
116
+ self . pendingLookupsLock. unlock ( )
117
+ // call back on the request queue
118
+ callbackQueue. async {
119
+ completion ( result)
120
+ }
106
121
}
107
122
108
- self . lookupSemaphore . wait ( )
109
- let relativePath = repository . storagePath ( )
110
- let repositoryPath = self . path . appending ( relativePath )
111
- let handle = RepositoryManager . RepositoryHandle ( manager : self , repository : repository , subpath : relativePath )
123
+ // we must not block the calling thread (for concurrency control) so nesting this in a queue
124
+ self . lookupQueue . addOperation {
125
+ // park the lookup thread based on the max concurrency allowed
126
+ self . concurrencySemaphore . wait ( )
112
127
113
- // check if there is a pending lookup
114
- self . pendingLookupsLock. lock ( )
115
- if let pendingLookup = self . pendingLookups [ repository] {
116
- self . pendingLookupsLock. unlock ( )
117
- // chain onto the pending lookup
118
- return pendingLookup. notify ( queue: callbackQueue) {
119
- // at this point the previous lookup should be complete and we can re-lookup
120
- self . lookup (
128
+ // check if there is a pending lookup
129
+ self . pendingLookupsLock. lock ( )
130
+ if let pendingLookup = self . pendingLookups [ repository] {
131
+ self . pendingLookupsLock. unlock ( )
132
+ // chain onto the pending lookup
133
+ return pendingLookup. notify ( queue: . sharedConcurrent) {
134
+ // at this point the previous lookup should be complete and we can re-lookup
135
+ completion ( . init( catching: {
136
+ try self . lookup (
137
+ package : package ,
138
+ repository: repository,
139
+ skipUpdate: skipUpdate,
140
+ observabilityScope: observabilityScope,
141
+ delegateQueue: delegateQueue
142
+ )
143
+ } ) )
144
+ }
145
+ } else {
146
+ // record the pending lookup
147
+ assert ( self . pendingLookups [ repository] == nil )
148
+ let group = DispatchGroup ( )
149
+ group. enter ( )
150
+ self . pendingLookups [ repository] = group
151
+ self . pendingLookupsLock. unlock ( )
152
+ }
153
+
154
+ completion ( . init( catching: {
155
+ try self . lookup (
121
156
package : package ,
122
157
repository: repository,
123
158
skipUpdate: skipUpdate,
124
159
observabilityScope: observabilityScope,
125
- delegateQueue: delegateQueue,
126
- callbackQueue: callbackQueue,
127
- completion: originalCompletion
160
+ delegateQueue: delegateQueue
128
161
)
129
- }
162
+ } ) )
130
163
}
164
+ }
131
165
132
- // record the pending lookup
133
- assert ( self . pendingLookups [ repository] == nil )
134
- let group = DispatchGroup ( )
135
- group. enter ( )
136
- self . pendingLookups [ repository] = group
137
- self . pendingLookupsLock. unlock ( )
166
+ // sync version of the lookup,
167
+ // this is here because it simplifies reading & maintaining the logical flow
168
+ // while the underlying git client is sync
169
+ // once we move to an async git client we would need to get rid of this
170
+ // sync func and roll the logic into the async version above
171
+ private func lookup(
172
+ package : PackageIdentity ,
173
+ repository: RepositorySpecifier ,
174
+ skipUpdate: Bool ,
175
+ observabilityScope: ObservabilityScope ,
176
+ delegateQueue: DispatchQueue
177
+ ) throws -> RepositoryHandle {
178
+ let relativePath = repository. storagePath ( )
179
+ let repositoryPath = self . path. appending ( relativePath)
180
+ let handle = RepositoryHandle ( manager: self , repository: repository, subpath: relativePath)
138
181
139
182
// check if a repository already exists
140
183
// errors when trying to check if a repository already exists are legitimate
141
184
// and recoverable, and as such can be ignored
142
185
if ( try ? self . provider. repositoryExists ( at: repositoryPath) ) ?? false {
143
- let result = Result < RepositoryHandle , Error > ( catching: {
144
- // skip update if not needed
145
- if skipUpdate {
146
- return handle
147
- }
148
- // Update the repository when it is being looked up.
149
- let start = DispatchTime . now ( )
150
- delegateQueue. async {
151
- self . delegate? . willUpdate ( package : package , repository: handle. repository)
152
- }
153
- let repository = try handle. open ( )
154
- try repository. fetch ( )
155
- let duration = start. distance ( to: . now( ) )
156
- delegateQueue. async {
157
- self . delegate? . didUpdate ( package : package , repository: handle. repository, duration: duration)
158
- }
186
+ // update if necessary and return early
187
+ // skip update if not needed
188
+ if skipUpdate {
159
189
return handle
160
- } )
161
-
162
- // remove the pending lookup
163
- self . pendingLookupsLock. lock ( )
164
- self . pendingLookups [ repository] ? . leave ( )
165
- self . pendingLookups [ repository] = nil
166
- self . pendingLookupsLock. unlock ( )
167
- // and done
168
- return completion ( result)
190
+ }
191
+ // Update the repository when it is being looked up.
192
+ let start = DispatchTime . now ( )
193
+ delegateQueue. async {
194
+ self . delegate? . willUpdate ( package : package , repository: handle. repository)
195
+ }
196
+ let repository = try handle. open ( )
197
+ try repository. fetch ( )
198
+ let duration = start. distance ( to: . now( ) )
199
+ delegateQueue. async {
200
+ self . delegate? . didUpdate ( package : package , repository: handle. repository, duration: duration)
201
+ }
202
+ return handle
169
203
}
170
204
171
- // perform the fetch
172
205
// inform delegate that we are starting to fetch
173
206
// calculate if cached (for delegate call) outside queue as it may change while queue is processing
174
207
let isCached = self . cachePath. map { self . fileSystem. exists ( $0. appending ( handle. subpath) ) } ?? false
@@ -177,41 +210,31 @@ public class RepositoryManager {
177
210
self . delegate? . willFetch ( package : package , repository: handle. repository, details: details)
178
211
}
179
212
213
+ // perform the fetch
180
214
let start = DispatchTime . now ( )
181
- let lookupResult : Result < RepositoryHandle , Error >
182
- let delegateResult : Result < FetchDetails , Error >
183
-
184
- do {
215
+ let fetchResult = Result < FetchDetails , Error > ( catching: {
185
216
// make sure destination is free.
186
217
try ? self . fileSystem. removeFileTree ( repositoryPath)
187
- // Fetch the repo.
188
- let details = try self . fetchAndPopulateCache (
218
+ // fetch the repo and cache the results
219
+ return try self . fetchAndPopulateCache (
189
220
package : package ,
190
221
handle: handle,
191
222
repositoryPath: repositoryPath,
192
223
observabilityScope: observabilityScope,
193
224
delegateQueue: delegateQueue
194
225
)
195
- lookupResult = . success( handle)
196
- delegateResult = . success( details)
197
- } catch {
198
- lookupResult = . failure( error)
199
- delegateResult = . failure( error)
200
- }
226
+ } )
201
227
202
- // Inform delegate.
228
+ // inform delegate fetch is done
203
229
let duration = start. distance ( to: . now( ) )
204
230
delegateQueue. async {
205
- self . delegate? . didFetch ( package : package , repository: handle. repository, result: delegateResult , duration: duration)
231
+ self . delegate? . didFetch ( package : package , repository: handle. repository, result: fetchResult , duration: duration)
206
232
}
207
233
208
- // remove the pending lookup
209
- self . pendingLookupsLock. lock ( )
210
- self . pendingLookups [ repository] ? . leave ( )
211
- self . pendingLookups [ repository] = nil
212
- self . pendingLookupsLock. unlock ( )
213
- // and done
214
- completion ( lookupResult)
234
+ // at this point we can throw, as we already notified the delegate above
235
+ _ = try fetchResult. get ( )
236
+
237
+ return handle
215
238
}
216
239
217
240
/// Fetches the repository into the cache. If no `cachePath` is set or an error occurred fall back to fetching the repository without populating the cache.
0 commit comments