Skip to content

Commit d08ed02

Browse files
dfedbachand
andauthored
Rename AsyncQueue -> FIFOQueue. Create ActorQueue (#3)
* Rename AsyncQueue -> FIFOQueue. Create ActorQueue * Create SemaphoreTests.swift to up coverage * Simplify * Get to 100% coverage * Enable sending synchronous tasks to the ActorQueue * Improve explanation of ActorQueue's ordering guarantee * The queues are the antecedent Co-authored-by: Michael Bachand <[email protected]> * Improve accuracy of ActorQueue description in README Co-authored-by: Michael Bachand <[email protected]> * ActorQueue does not need to be Sendable * Update comments * documentation copy/pasta fix * Disable sending synchronous tasks to the ActorQueue * Better explain how test_async_startsExecutionOfNextTaskAfterSuspension works + simplify test * Do not wait a runloop to test * further test cleanup * Write comments to explain the test * Make wait() return whether it suspended * Improve test_wait_suspendsUntilEqualNumberOfSignalCalls * Better comments * Better README documentation * Remove duplicative label 'queue' from README discussion * Eliminate race in test_wait_suspendsUntilEqualNumberOfSignalCalls() * Move ActorQueue example to a doc comment * Bump version and update README * Simplify ActorExecutor * Do not ship Semaphore. Use fewer 'await' calls in ActorExecutor * Add link to Swift bug report * Since we're renaming we should bump a major version (i.e. a minor version in beta semver) * Delete SemaphoreTests.swift since it is no longer required * Remove unnecessary '@mainactor' decoration on test * Add test_async_retainsReceiverUntilFlushed() to ActorQueueTests to match FIFOQueueTests * Better test name * Make explicit that queue is deallocated in tests * Update test_async_executesEnqueuedTasksAfterReceiverIsDeallocated() in ActorQueueTests to use an ActorQueue * Accurate test method naming * Remove unnecessary await * Add swift tag to multi-line code examples Co-authored-by: Michael Bachand <[email protected]>
1 parent b09b217 commit d08ed02

File tree

8 files changed

+508
-74
lines changed

8 files changed

+508
-74
lines changed

AsyncQueue.podspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Pod::Spec.new do |s|
22
s.name = 'AsyncQueue'
3-
s.version = '0.0.1'
3+
s.version = '0.1.0'
44
s.license = 'MIT'
55
s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.'
66
s.homepage = 'https://github.com/dfed/swift-async-queue'

README.md

Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,89 @@
66
[![License](https://img.shields.io/cocoapods/l/AsyncQueue.svg)](https://cocoapods.org/pods/AsyncQueue)
77
[![Platform](https://img.shields.io/cocoapods/p/AsyncQueue.svg)](https://cocoapods.org/pods/AsyncQueue)
88

9-
A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts.
9+
A library of queues that enable sending ordered tasks from synchronous to asynchronous contexts.
1010

11-
## Usage
11+
## Task Ordering and Swift Concurrency
1212

13-
### Basic Initialization
13+
Tasks sent from a synchronous context to an asynchronous context in Swift Concurrency are inherently unordered. Consider the following test:
1414

1515
```swift
16-
let asyncQueue = AsyncQueue()
16+
@MainActor
17+
func test_mainActor_taskOrdering() async {
18+
var counter = 0
19+
var tasks = [Task<Void, Never>]()
20+
for iteration in 1...100 {
21+
tasks.append(Task {
22+
counter += 1
23+
XCTAssertEqual(counter, iteration) // often fails
24+
})
25+
}
26+
for task in tasks {
27+
_ = await task.value
28+
}
29+
}
1730
```
1831

19-
### Sending events from a synchronous context
32+
Despite the spawned `Task` inheriting the serial `@MainActor` execution context, the ordering of the scheduled asynchronous work is not guaranteed.
33+
34+
While [actors](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID645) are great at serializing tasks, there is no simple way in the standard Swift library to send ordered tasks to them from a synchronous context.
35+
36+
### Executing asynchronous tasks in FIFO order
37+
38+
Use a `FIFOQueue` to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued.
2039

2140
```swift
22-
asyncQueue.async { /* awaitable context that executes after all other enqueued work is completed */ }
41+
let queue = FIFOQueue()
42+
queue.async {
43+
/*
44+
`async` context that executes after all other enqueued work is completed.
45+
Work enqueued after this task will wait for this task to complete.
46+
*/
47+
try? await Task.sleep(nanoseconds: 1_000_000)
48+
}
49+
queue.async {
50+
/*
51+
This task begins execution once the above one-second sleep completes.
52+
*/
53+
}
54+
Task {
55+
await queue.await {
56+
/*
57+
`async` context that can return a value or throw an error.
58+
Executes after all other enqueued work is completed.
59+
Work enqueued after this task will wait for this task to complete.
60+
*/
61+
}
62+
}
2363
```
2464

25-
### Awaiting work from an asynchronous context
65+
### Sending ordered asynchronous tasks to Actors
66+
67+
Use an `ActorQueue` to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code.
2668

2769
```swift
28-
await asyncQueue.await { /* throw-able, return-able, awaitable context that executes after all other enqueued work is completed */ }
70+
let queue = ActorQueue()
71+
queue.async {
72+
/*
73+
`async` context that executes after all other enqueued work has begun executing.
74+
Work enqueued after this task will wait for this task to complete or suspend.
75+
*/
76+
try? await Task.sleep(nanoseconds: 1_000_000)
77+
}
78+
queue.async {
79+
/*
80+
This task begins execution once the above task suspends due to the one-second sleep.
81+
*/
82+
}
83+
Task {
84+
await queue.await {
85+
/*
86+
`async` context that can return a value or throw an error.
87+
Executes after all other enqueued work has begun executing.
88+
Work enqueued after this task will wait for this task to complete or suspend.
89+
*/
90+
}
91+
}
2992
```
3093

3194
## Requirements
@@ -45,7 +108,7 @@ To install swift-async-queue in your iOS project with [Swift Package Manager](ht
45108

46109
```swift
47110
dependencies: [
48-
.package(url: "https://github.com/dfed/swift-async-queue", from: "0.0.1"),
111+
.package(url: "https://github.com/dfed/swift-async-queue", from: "0.1.0"),
49112
]
50113
```
51114

@@ -55,7 +118,7 @@ To install swift-async-queue in your iOS project with [CocoaPods](http://cocoapo
55118

56119
```
57120
platform :ios, '13.0'
58-
pod 'AsyncQueue', '~> 0.1'
121+
pod 'AsyncQueue', '~> 0.1.0'
59122
```
60123

61124
## Contributing

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// MIT License
2+
//
3+
// Copyright (c) 2022 Dan Federman
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
/// A queue that executes asynchronous tasks enqueued from a nonisolated context.
24+
/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow subsequently enqueued tasks to begin executing.
25+
/// Asynchronous tasks sent to this queue execute as they would in an `actor` type, allowing for re-entrancy and non-FIFO behavior when an individual task suspends.
26+
///
27+
/// An `ActorQueue` is used to ensure tasks sent from a nonisolated context to a single `actor`'s isolated context begin execution in order.
28+
/// Here is an example of how an `ActorQueue` should be utilized within an `actor`:
29+
/// ```swift
30+
/// public actor LogStore {
31+
///
32+
/// nonisolated
33+
/// public func log(_ message: String) {
34+
/// queue.async {
35+
/// await self.append(message)
36+
/// }
37+
/// }
38+
///
39+
/// nonisolated
40+
/// public func retrieveLogs() async -> [String] {
41+
/// await queue.await { await self.logs }
42+
/// }
43+
///
44+
/// private func append(_ message: String) {
45+
/// logs.append(message)
46+
/// }
47+
///
48+
/// private let queue = ActorQueue()
49+
/// private var logs = [String]()
50+
/// }
51+
/// ```
52+
///
53+
/// - Warning: Execution order is not guaranteed unless the enqueued tasks interact with a single `actor` instance.
54+
public final class ActorQueue {
55+
56+
// MARK: Initialization
57+
58+
/// Instantiates an actor queue.
59+
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
60+
public init(priority: TaskPriority? = nil) {
61+
var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
62+
let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in
63+
capturedTaskStreamContinuation = continuation
64+
}
65+
// Continuation will be captured during stream creation, so it is safe to force unwrap here.
66+
// If this force-unwrap fails, something is fundamentally broken in the Swift runtime.
67+
taskStreamContinuation = capturedTaskStreamContinuation!
68+
69+
Task.detached(priority: priority) {
70+
let executor = ActorExecutor()
71+
for await task in taskStream {
72+
await executor.suspendUntilStarted(task)
73+
}
74+
}
75+
}
76+
77+
deinit {
78+
taskStreamContinuation.finish()
79+
}
80+
81+
// MARK: Public
82+
83+
/// Schedules an asynchronous task for execution and immediately returns.
84+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
85+
/// - Parameter task: The task to enqueue.
86+
public func async(_ task: @escaping @Sendable () async -> Void) {
87+
taskStreamContinuation.yield(task)
88+
}
89+
90+
/// Schedules an asynchronous task and returns after the task is complete.
91+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
92+
/// - Parameter task: The task to enqueue.
93+
/// - Returns: The value returned from the enqueued task.
94+
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
95+
await withUnsafeContinuation { continuation in
96+
taskStreamContinuation.yield {
97+
continuation.resume(returning: await task())
98+
}
99+
}
100+
}
101+
102+
/// Schedules an asynchronous throwing task and returns after the task is complete.
103+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
104+
/// - Parameter task: The task to enqueue.
105+
/// - Returns: The value returned from the enqueued task.
106+
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
107+
try await withUnsafeThrowingContinuation { continuation in
108+
taskStreamContinuation.yield {
109+
do {
110+
continuation.resume(returning: try await task())
111+
} catch {
112+
continuation.resume(throwing: error)
113+
}
114+
}
115+
}
116+
}
117+
118+
// MARK: Private
119+
120+
private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation
121+
122+
// MARK: - ActorExecutor
123+
124+
private actor ActorExecutor {
125+
func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async {
126+
// Suspend the calling code until our enqueued task starts.
127+
await withUnsafeContinuation { continuation in
128+
// Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete.
129+
Task {
130+
// Force this task to execute within the ActorExecutor's context by accessing an ivar on the instance.
131+
// This works around a bug when compiling with Xcode 14.1: https://github.com/apple/swift/issues/62503
132+
_ = void
133+
134+
// Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed.
135+
continuation.resume()
136+
await task()
137+
}
138+
}
139+
}
140+
141+
private let void: Void = ()
142+
}
143+
144+
}

Sources/AsyncQueue/AsyncQueue.swift renamed to Sources/AsyncQueue/FIFOQueue.swift

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121
// SOFTWARE.
2222

23-
/// A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts
24-
public final class AsyncQueue: Sendable {
23+
/// A queue that executes asynchronous tasks enqueued from a nonisolated context in FIFO order.
24+
/// Tasks are guaranteed to begin _and end_ executing in the order in which they are enqueued.
25+
/// Asynchronous tasks sent to this queue work as they would in a `DispatchQueue` type. Attempting to `await` this queue from a task executing on this queue will result in a deadlock.
26+
public final class FIFOQueue: Sendable {
2527

2628
// MARK: Initialization
2729

28-
/// Instantiates an asynchronous queue.
30+
/// Instantiates a FIFO queue.
2931
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
3032
public init(priority: TaskPriority? = nil) {
3133
var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
@@ -56,7 +58,7 @@ public final class AsyncQueue: Sendable {
5658
taskStreamContinuation.yield(task)
5759
}
5860

59-
/// Schedules an asynchronous throwing task and returns after the task is complete.
61+
/// Schedules an asynchronous task and returns after the task is complete.
6062
/// The scheduled task will not execute until all prior tasks have completed.
6163
/// - Parameter task: The task to enqueue.
6264
/// - Returns: The value returned from the enqueued task.
@@ -68,7 +70,7 @@ public final class AsyncQueue: Sendable {
6870
}
6971
}
7072

71-
/// Schedules an asynchronous task and returns after the task is complete.
73+
/// Schedules an asynchronous throwing task and returns after the task is complete.
7274
/// The scheduled task will not execute until all prior tasks have completed.
7375
/// - Parameter task: The task to enqueue.
7476
/// - Returns: The value returned from the enqueued task.

0 commit comments

Comments
 (0)