Skip to content

Commit b512488

Browse files
committed
Add a concurrentForEach method to start a concurrent task for each element in a collection
This is simpler to reason about than creating a `TaskGroup`.
1 parent 1425fee commit b512488

File tree

2 files changed

+19
-27
lines changed

2 files changed

+19
-27
lines changed

Sources/SemanticIndex/SemanticIndexManager.swift

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -328,13 +328,7 @@ package final actor SemanticIndexManager {
328328
}
329329

330330
private func waitForBuildGraphGenerationTasks() async {
331-
await withTaskGroup(of: Void.self) { taskGroup in
332-
for generateBuildGraphTask in scheduleIndexingTasks.values {
333-
taskGroup.addTask {
334-
await generateBuildGraphTask.value
335-
}
336-
}
337-
}
331+
await scheduleIndexingTasks.values.concurrentForEach { await $0.value }
338332
}
339333

340334
/// Wait for all in-progress index tasks to finish.
@@ -344,18 +338,13 @@ package final actor SemanticIndexManager {
344338
// can await the index tasks below.
345339
await waitForBuildGraphGenerationTasks()
346340

347-
await withTaskGroup(of: Void.self) { taskGroup in
348-
for (_, status) in inProgressIndexTasks {
349-
switch status {
350-
case .waitingForPreparation(preparationTaskID: _, indexTask: let indexTask),
351-
.preparing(preparationTaskID: _, indexTask: let indexTask),
352-
.updatingIndexStore(updateIndexStoreTask: _, indexTask: let indexTask):
353-
taskGroup.addTask {
354-
await indexTask.value
355-
}
356-
}
341+
await inProgressIndexTasks.concurrentForEach { (_, status) in
342+
switch status {
343+
case .waitingForPreparation(preparationTaskID: _, indexTask: let indexTask),
344+
.preparing(preparationTaskID: _, indexTask: let indexTask),
345+
.updatingIndexStore(updateIndexStoreTask: _, indexTask: let indexTask):
346+
await indexTask.value
357347
}
358-
await taskGroup.waitForAll()
359348
}
360349
index.pollForUnitChangesAndWait()
361350
logger.debug("Done waiting for up-to-date index")
@@ -757,17 +746,9 @@ package final actor SemanticIndexManager {
757746
}
758747
indexTasksWereScheduled(newIndexTasks)
759748
}
760-
let indexTasksImmutable = indexTasks
761749

762750
return Task(priority: priority) {
763-
await withTaskGroup(of: Void.self) { taskGroup in
764-
for indexTask in indexTasksImmutable {
765-
taskGroup.addTask {
766-
await indexTask.value
767-
}
768-
}
769-
await taskGroup.waitForAll()
770-
}
751+
await indexTasks.concurrentForEach { await $0.value }
771752
}
772753
}
773754
}

Sources/SwiftExtensions/AsyncUtils.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ extension Collection where Element: Sendable {
165165
count = indexedResults.count
166166
}
167167
}
168+
169+
/// Invoke `body` for every element in the collection and wait for all calls of `body` to finish
170+
package func concurrentForEach(_ body: @escaping @Sendable (Element) async -> Void) async {
171+
await withTaskGroup(of: Void.self) { taskGroup in
172+
for element in self {
173+
taskGroup.addTask {
174+
await body(element)
175+
}
176+
}
177+
}
178+
}
168179
}
169180

170181
package struct TimeoutError: Error, CustomStringConvertible {

0 commit comments

Comments
 (0)