Skip to content

Commit 4aaf471

Browse files
authored
Merge pull request #75878 from jamieQ/stream-termination
2 parents 4376692 + 3bea96a commit 4aaf471

File tree

2 files changed

+47
-17
lines changed

2 files changed

+47
-17
lines changed

stdlib/public/Concurrency/AsyncStreamBuffer.swift

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -212,25 +212,21 @@ extension AsyncStream {
212212
state.onTermination = nil
213213
state.terminal = true
214214

215-
if let continuation = state.continuations.first {
216-
if state.pending.count > 0 {
217-
state.continuations.removeFirst()
218-
let toSend = state.pending.removeFirst()
219-
unlock()
220-
handler?(.finished)
221-
continuation.resume(returning: toSend)
222-
} else if state.terminal {
223-
state.continuations.removeFirst()
224-
unlock()
225-
handler?(.finished)
226-
continuation.resume(returning: nil)
227-
} else {
228-
unlock()
229-
handler?(.finished)
230-
}
231-
} else {
215+
guard !state.continuations.isEmpty else {
232216
unlock()
233217
handler?(.finished)
218+
return
219+
}
220+
221+
// Hold on to the continuations to resume outside the lock.
222+
let continuations = state.continuations
223+
state.continuations.removeAll()
224+
225+
unlock()
226+
handler?(.finished)
227+
228+
for continuation in continuations {
229+
continuation.resume(returning: nil)
234230
}
235231
}
236232

test/Concurrency/Runtime/async_stream.swift

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,40 @@ class NotSendable {}
435435
expectTrue(expectation.fulfilled)
436436
}
437437

438+
// MARK: - Multiple consumers
439+
440+
tests.test("finish behavior with multiple consumers") {
441+
let (stream, continuation) = AsyncStream<Int>.makeStream()
442+
let (controlStream, controlContinuation) = AsyncStream<Int>.makeStream()
443+
var controlIterator = controlStream.makeAsyncIterator()
444+
445+
func makeConsumingTaskWithIndex(_ index: Int) -> Task<Void, Never> {
446+
Task { @MainActor in
447+
controlContinuation.yield(index)
448+
for await i in stream {
449+
controlContinuation.yield(i)
450+
}
451+
}
452+
}
453+
454+
// Set up multiple consumers
455+
let consumer1 = makeConsumingTaskWithIndex(1)
456+
expectEqual(await controlIterator.next(isolation: #isolation), 1)
457+
458+
let consumer2 = makeConsumingTaskWithIndex(2)
459+
expectEqual(await controlIterator.next(isolation: #isolation), 2)
460+
461+
// Ensure the iterators are suspended
462+
await MainActor.run {}
463+
464+
// Terminate the stream
465+
continuation.finish()
466+
467+
// Ensure the consuming Tasks both complete
468+
_ = await consumer1.value
469+
_ = await consumer2.value
470+
}
471+
438472
await runAllTestsAsync()
439473
}
440474
}

0 commit comments

Comments
 (0)