@@ -15,6 +15,7 @@ import Dispatch
15
15
import class Foundation. NSLock
16
16
import class Foundation. ProcessInfo
17
17
import struct Foundation. URL
18
+ import struct Foundation. UUID
18
19
import func TSCBasic. tsc_await
19
20
20
21
public enum Concurrency {
@@ -76,3 +77,203 @@ extension DispatchQueue {
76
77
}
77
78
}
78
79
}
80
+
81
+ /// A queue for running async operations with a limit on the number of concurrent tasks.
82
+ public final class AsyncOperationQueue : @unchecked Sendable {
83
+
84
+ // This implementation is identical to the AsyncOperationQueue in swift-build.
85
+ // Any modifications made here should also be made there.
86
+ // https://github.com/swiftlang/swift-build/blob/main/Sources/SWBUtil/AsyncOperationQueue.swift#L13
87
+
88
+ fileprivate typealias ID = UUID
89
+ fileprivate typealias WaitingContinuation = CheckedContinuation < Void , any Error >
90
+
91
+ private let concurrentTasks : Int
92
+ private var waitingTasks : [ WorkTask ] = [ ]
93
+ private let waitingTasksLock = NSLock ( )
94
+
95
+ fileprivate enum WorkTask {
96
+ case creating( ID )
97
+ case waiting( ID , WaitingContinuation )
98
+ case running( ID )
99
+ case cancelled( ID )
100
+
101
+ var id : ID {
102
+ switch self {
103
+ case . creating( let id) , . waiting( let id, _) , . running( let id) , . cancelled( let id) :
104
+ return id
105
+ }
106
+ }
107
+
108
+ var continuation : WaitingContinuation ? {
109
+ guard case . waiting( _, let continuation) = self else {
110
+ return nil
111
+ }
112
+ return continuation
113
+ }
114
+ }
115
+
116
+ /// Creates an `AsyncOperationQueue` with a specified number of concurrent tasks.
117
+ /// - Parameter concurrentTasks: The maximum number of concurrent tasks that can be executed concurrently.
118
+ public init ( concurrentTasks: Int ) {
119
+ self . concurrentTasks = concurrentTasks
120
+ }
121
+
122
+ deinit {
123
+ waitingTasksLock. withLock {
124
+ if !waitingTasks. isEmpty {
125
+ preconditionFailure ( " Deallocated with waiting tasks " )
126
+ }
127
+ }
128
+ }
129
+
130
+ /// Executes an asynchronous operation, ensuring that the number of concurrent tasks
131
+ // does not exceed the specified limit.
132
+ /// - Parameter operation: The asynchronous operation to execute.
133
+ /// - Returns: The result of the operation.
134
+ /// - Throws: An error thrown by the operation, or a `CancellationError` if the operation is cancelled.
135
+ public func withOperation< ReturnValue> (
136
+ _ operation: ( ) async throws -> sending ReturnValue
137
+ ) async throws -> ReturnValue {
138
+ let taskId = try await waitIfNeeded ( )
139
+ defer { signalCompletion ( taskId) }
140
+ return try await operation ( )
141
+ }
142
+
143
+ private func waitIfNeeded( ) async throws -> ID {
144
+ let workTask = waitingTasksLock. withLock ( {
145
+ let shouldWait = waitingTasks. count >= concurrentTasks
146
+ let workTask = shouldWait ? WorkTask . creating ( ID ( ) ) : . running( ID ( ) )
147
+ waitingTasks. append ( workTask)
148
+ return workTask
149
+ } )
150
+
151
+ // If we aren't creating a task that needs to wait, we're under the concurrency limit.
152
+ guard case . creating( let taskId) = workTask else {
153
+ return workTask. id
154
+ }
155
+
156
+ enum TaskAction {
157
+ case start( WaitingContinuation )
158
+ case cancel( WaitingContinuation )
159
+ }
160
+
161
+ try await withTaskCancellationHandler {
162
+ try await withCheckedThrowingContinuation { ( continuation: WaitingContinuation ) -> Void in
163
+ let action : TaskAction ? = waitingTasksLock. withLock {
164
+ guard let index = waitingTasks. firstIndex ( where: { $0. id == taskId } ) else {
165
+ // The task may have been marked as cancelled already and then removed from
166
+ // waitingTasks in `signalCompletion`.
167
+ return . cancel( continuation)
168
+ }
169
+
170
+ switch waitingTasks [ index] {
171
+ case . cancelled:
172
+ // If the task was cancelled in between creating the task cancellation handler and acquiring the lock,
173
+ // we should resume the continuation with a `CancellationError`.
174
+ waitingTasks. remove ( at: index)
175
+ return . cancel( continuation)
176
+ case . creating, . running, . waiting:
177
+ // A task may have completed since we initially checked if we should wait. Check again in this locked
178
+ // section and if we can start it, remove it from the waiting tasks and start it immediately.
179
+ if waitingTasks. count >= concurrentTasks {
180
+ waitingTasks [ index] = . waiting( taskId, continuation)
181
+ return nil
182
+ } else {
183
+ waitingTasks. remove ( at: index)
184
+ return . start( continuation)
185
+ }
186
+ }
187
+ }
188
+
189
+ switch action {
190
+ case . some( . cancel( let continuation) ) :
191
+ continuation. resume ( throwing: _Concurrency. CancellationError ( ) )
192
+ case . some( . start( let continuation) ) :
193
+ continuation. resume ( )
194
+ case . none:
195
+ return
196
+ }
197
+ }
198
+ } onCancel: {
199
+ let continuation : WaitingContinuation ? = self . waitingTasksLock. withLock {
200
+ guard let taskIndex = self . waitingTasks. firstIndex ( where: { $0. id == taskId } ) else {
201
+ return nil
202
+ }
203
+
204
+ switch self . waitingTasks [ taskIndex] {
205
+ case . waiting( _, let continuation) :
206
+ self . waitingTasks. remove ( at: taskIndex)
207
+
208
+ // If the parent task is cancelled then we need to manually handle resuming the
209
+ // continuation for the waiting task with a `CancellationError`. Return the continuation
210
+ // here so it can be resumed once the `waitingTasksLock` is released.
211
+ return continuation
212
+ case . creating, . running:
213
+ // If the task was still being created, mark it as cancelled in `waitingTasks` so that
214
+ // the handler for `withCheckedThrowingContinuation` can immediately cancel it.
215
+ self . waitingTasks [ taskIndex] = . cancelled( taskId)
216
+ return nil
217
+ case . cancelled:
218
+ preconditionFailure ( " Attempting to cancel a task that was already cancelled " )
219
+ }
220
+ }
221
+
222
+ continuation? . resume ( throwing: _Concurrency. CancellationError ( ) )
223
+ }
224
+ return workTask. id
225
+ }
226
+
227
+ private func signalCompletion( _ taskId: ID ) {
228
+ let continuationToResume = waitingTasksLock. withLock { ( ) -> WaitingContinuation ? in
229
+ guard !waitingTasks. isEmpty else {
230
+ return nil
231
+ }
232
+
233
+ // Remove the completed task from the list to decrement the active task count.
234
+ if let taskIndex = self . waitingTasks. firstIndex ( where: { $0. id == taskId } ) {
235
+ waitingTasks. remove ( at: taskIndex)
236
+ }
237
+
238
+ // We cannot remove elements from `waitingTasks` while iterating over it, so we make
239
+ // a pass to collect operations and then apply them after the loop.
240
+ func createTaskListOperations( ) -> ( CollectionDifference < WorkTask > ? , WaitingContinuation ? ) {
241
+ var changes : [ CollectionDifference < WorkTask > . Change ] = [ ]
242
+ for (index, task) in waitingTasks. enumerated ( ) {
243
+ switch task {
244
+ case . running:
245
+ // Skip tasks that are already running, looking for the first one that is waiting or creating.
246
+ continue
247
+ case . creating:
248
+ // If the next task is in the process of being created, let the
249
+ // creation code in the `withCheckedThrowingContinuation` in `waitIfNeeded`
250
+ // handle starting the task.
251
+ break
252
+ case . waiting:
253
+ // Begin the next waiting task
254
+ changes. append ( . remove( offset: index, element: task, associatedWith: nil ) )
255
+ return ( CollectionDifference < WorkTask > ( changes) , task. continuation)
256
+ case . cancelled:
257
+ // If the next task is cancelled, continue removing cancelled
258
+ // tasks until we find one that hasn't run yet, or we exaust the list of waiting tasks.
259
+ changes. append ( . remove( offset: index, element: task, associatedWith: nil ) )
260
+ continue
261
+ }
262
+ }
263
+ return ( CollectionDifference < WorkTask > ( changes) , nil )
264
+ }
265
+
266
+ let ( collectionOperations, continuation) = createTaskListOperations ( )
267
+ if let operations = collectionOperations {
268
+ guard let appliedDiff = waitingTasks. applying ( operations) else {
269
+ preconditionFailure ( " Failed to apply changes to waiting tasks " )
270
+ }
271
+ waitingTasks = appliedDiff
272
+ }
273
+
274
+ return continuation
275
+ }
276
+
277
+ continuationToResume? . resume ( )
278
+ }
279
+ }
0 commit comments