Skip to content

Commit 1e9c0dc

Browse files
authored
Add support for Task cancellation (#54)
1 parent fad6bf4 commit 1e9c0dc

File tree

7 files changed

+338
-86
lines changed

7 files changed

+338
-86
lines changed

AsyncQueue.podspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Pod::Spec.new do |s|
22
s.name = 'AsyncQueue'
3-
s.version = '0.7.0'
3+
s.version = '0.7.1'
44
s.license = 'MIT'
55
s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.'
66
s.homepage = 'https://github.com/dfed/swift-async-queue'

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -60,29 +60,12 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
6060
let (taskStream, taskStreamContinuation) = AsyncStream<ActorTask>.makeStream()
6161
self.taskStreamContinuation = taskStreamContinuation
6262

63-
func beginExecuting(
64-
_ operation: sending @escaping (isolated ActorType) async -> Void,
65-
in context: isolated ActorType,
66-
priority: TaskPriority?
67-
) {
68-
// In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
69-
// Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution.
70-
Task(priority: priority) {
71-
await operation(context)
72-
}
73-
}
74-
7563
Task {
7664
// In an ideal world, we would isolate this `for await` loop to the `ActorType`.
7765
// However, there's no good way to do that without retaining the actor and creating a cycle.
7866
for await actorTask in taskStream {
7967
// Await switching to the ActorType context.
80-
await beginExecuting(
81-
actorTask.task,
82-
in: actorTask.executionContext,
83-
priority: actorTask.priority
84-
)
85-
await actorTask.sempahore.signal()
68+
await actorTask.task(actorTask.executionContext)
8669
}
8770
}
8871
}
@@ -120,17 +103,13 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
120103
fileprivate struct ActorTask: Sendable {
121104
init(
122105
executionContext: ActorType,
123-
priority: TaskPriority?,
124106
task: @escaping @Sendable (isolated ActorType) async -> Void
125107
) {
126108
self.executionContext = executionContext
127-
self.priority = priority
128109
self.task = task
129110
}
130-
111+
131112
let executionContext: ActorType
132-
let sempahore = Semaphore()
133-
let priority: TaskPriority?
134113
let task: @Sendable (isolated ActorType) async -> Void
135114
}
136115

@@ -177,17 +156,25 @@ extension Task {
177156
operation: @Sendable @escaping (isolated ActorType) async -> Success
178157
) where Failure == Never {
179158
let delivery = Delivery<Success, Failure>()
159+
let semaphore = Semaphore()
180160
let task = ActorQueue<ActorType>.ActorTask(
181161
executionContext: actorQueue.executionContext,
182-
priority: priority,
183162
task: { executionContext in
184-
await delivery.sendValue(operation(executionContext))
163+
await semaphore.wait()
164+
delivery.execute({ @Sendable executionContext in
165+
await delivery.sendValue(operation(executionContext))
166+
}, in: executionContext, priority: priority)
185167
}
186168
)
187169
actorQueue.taskStreamContinuation.yield(task)
188170
self.init(priority: priority) {
189-
await task.sempahore.wait()
190-
return await delivery.getValue()
171+
await withTaskCancellationHandler(
172+
operation: {
173+
await semaphore.signal()
174+
return await delivery.getValue()
175+
},
176+
onCancel: delivery.cancel
177+
)
191178
}
192179
}
193180

@@ -224,22 +211,29 @@ extension Task {
224211
operation: @escaping @Sendable (isolated ActorType) async throws -> Success
225212
) where Failure == any Error {
226213
let delivery = Delivery<Success, Failure>()
214+
let semaphore = Semaphore()
227215
let task = ActorQueue<ActorType>.ActorTask(
228216
executionContext: actorQueue.executionContext,
229-
priority: priority,
230217
task: { executionContext in
231-
do {
232-
try await delivery.sendValue(operation(executionContext))
233-
} catch {
234-
await delivery.sendFailure(error)
235-
}
218+
await semaphore.wait()
219+
delivery.execute({ @Sendable executionContext in
220+
do {
221+
try await delivery.sendValue(operation(executionContext))
222+
} catch {
223+
await delivery.sendFailure(error)
224+
}
225+
}, in: executionContext, priority: priority)
236226
}
237227
)
238-
239228
actorQueue.taskStreamContinuation.yield(task)
240229
self.init(priority: priority) {
241-
await task.sempahore.wait()
242-
return try await delivery.getValue()
230+
try await withTaskCancellationHandler(
231+
operation: {
232+
await semaphore.signal()
233+
return try await delivery.getValue()
234+
},
235+
onCancel: delivery.cancel
236+
)
243237
}
244238
}
245239

@@ -276,17 +270,25 @@ extension Task {
276270
operation: @MainActor @escaping () async -> Success
277271
) where Failure == Never {
278272
let delivery = Delivery<Success, Failure>()
273+
let semaphore = Semaphore()
279274
let task = ActorQueue<MainActor>.ActorTask(
280275
executionContext: actorQueue.executionContext,
281-
priority: priority,
282276
task: { executionContext in
283-
await delivery.sendValue(operation())
277+
await semaphore.wait()
278+
delivery.execute({ @Sendable executionContext in
279+
await delivery.sendValue(operation())
280+
}, in: executionContext, priority: priority)
284281
}
285282
)
286283
actorQueue.taskStreamContinuation.yield(task)
287284
self.init(priority: priority) {
288-
await task.sempahore.wait()
289-
return await delivery.getValue()
285+
return await withTaskCancellationHandler(
286+
operation: {
287+
await semaphore.signal()
288+
return await delivery.getValue()
289+
},
290+
onCancel: delivery.cancel
291+
)
290292
}
291293
}
292294

@@ -323,22 +325,29 @@ extension Task {
323325
operation: @escaping @MainActor () async throws -> Success
324326
) where Failure == any Error {
325327
let delivery = Delivery<Success, Failure>()
328+
let semaphore = Semaphore()
326329
let task = ActorQueue<MainActor>.ActorTask(
327330
executionContext: actorQueue.executionContext,
328-
priority: priority,
329331
task: { executionContext in
330-
do {
331-
try await delivery.sendValue(operation())
332-
} catch {
333-
await delivery.sendFailure(error)
334-
}
332+
await semaphore.wait()
333+
delivery.execute({ @Sendable executionContext in
334+
do {
335+
try await delivery.sendValue(operation())
336+
} catch {
337+
await delivery.sendFailure(error)
338+
}
339+
}, in: executionContext, priority: priority)
335340
}
336341
)
337-
338342
actorQueue.taskStreamContinuation.yield(task)
339343
self.init(priority: priority) {
340-
await task.sempahore.wait()
341-
return try await delivery.getValue()
344+
try await withTaskCancellationHandler(
345+
operation: {
346+
await semaphore.signal()
347+
return try await delivery.getValue()
348+
},
349+
onCancel: delivery.cancel
350+
)
342351
}
343352
}
344353
}

Sources/AsyncQueue/FIFOQueue.swift

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public final class FIFOQueue: Sendable {
3636
Task.detached(priority: priority) {
3737
for await fifoTask in taskStream {
3838
await fifoTask.task()
39-
await fifoTask.sempahore.signal()
4039
}
4140
}
4241
}
@@ -51,8 +50,7 @@ public final class FIFOQueue: Sendable {
5150
init(task: @escaping @Sendable () async -> Void) {
5251
self.task = task
5352
}
54-
55-
let sempahore = Semaphore()
53+
5654
let task: @Sendable () async -> Void
5755
}
5856

@@ -90,14 +88,23 @@ extension Task {
9088
@_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async -> Success
9189
) where Failure == Never {
9290
let delivery = Delivery<Success, Failure>()
91+
let semaphore = Semaphore()
9392
let executeOnce = UnsafeClosureHolder(operation: operation)
9493
let task = FIFOQueue.FIFOTask {
95-
await delivery.sendValue(executeOnce.operation())
94+
await semaphore.wait()
95+
await delivery.execute({ @Sendable delivery in
96+
await delivery.sendValue(executeOnce.operation())
97+
}, in: delivery).value
9698
}
9799
fifoQueue.taskStreamContinuation.yield(task)
98100
self.init {
99-
await task.sempahore.wait()
100-
return await delivery.getValue()
101+
await withTaskCancellationHandler(
102+
operation: {
103+
await semaphore.signal()
104+
return await delivery.getValue()
105+
},
106+
onCancel: delivery.cancel
107+
)
101108
}
102109
}
103110

@@ -127,22 +134,31 @@ extension Task {
127134
/// - operation: The operation to perform.
128135
@discardableResult
129136
public init(
130-
on actorQueue: FIFOQueue,
137+
on fifoQueue: FIFOQueue,
131138
@_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success
132139
) where Failure == any Error {
133140
let delivery = Delivery<Success, Failure>()
141+
let semaphore = Semaphore()
134142
let executeOnce = UnsafeThrowingClosureHolder(operation: operation)
135143
let task = FIFOQueue.FIFOTask {
136-
do {
137-
try await delivery.sendValue(executeOnce.operation())
138-
} catch {
139-
await delivery.sendFailure(error)
140-
}
144+
await semaphore.wait()
145+
await delivery.execute({ @Sendable delivery in
146+
do {
147+
try await delivery.sendValue(executeOnce.operation())
148+
} catch {
149+
delivery.sendFailure(error)
150+
}
151+
}, in: delivery).value
141152
}
142-
actorQueue.taskStreamContinuation.yield(task)
153+
fifoQueue.taskStreamContinuation.yield(task)
143154
self.init {
144-
await task.sempahore.wait()
145-
return try await delivery.getValue()
155+
try await withTaskCancellationHandler(
156+
operation: {
157+
await semaphore.signal()
158+
return try await delivery.getValue()
159+
},
160+
onCancel: delivery.cancel
161+
)
146162
}
147163
}
148164

@@ -179,13 +195,22 @@ extension Task {
179195
operation: @Sendable @escaping (isolated ActorType) async -> Success
180196
) where Failure == Never {
181197
let delivery = Delivery<Success, Failure>()
198+
let semaphore = Semaphore()
182199
let task = FIFOQueue.FIFOTask {
183-
await delivery.sendValue(operation(isolatedActor))
200+
await semaphore.wait()
201+
await delivery.execute({ @Sendable isolatedActor in
202+
await delivery.sendValue(operation(isolatedActor))
203+
}, in: isolatedActor, priority: priority).value
184204
}
185205
fifoQueue.taskStreamContinuation.yield(task)
186206
self.init {
187-
await task.sempahore.wait()
188-
return await delivery.getValue()
207+
await withTaskCancellationHandler(
208+
operation: {
209+
await semaphore.signal()
210+
return await delivery.getValue()
211+
},
212+
onCancel: delivery.cancel
213+
)
189214
}
190215
}
191216

@@ -224,17 +249,26 @@ extension Task {
224249
operation: @Sendable @escaping (isolated ActorType) async throws -> Success
225250
) where Failure == any Error {
226251
let delivery = Delivery<Success, Failure>()
252+
let semaphore = Semaphore()
227253
let task = FIFOQueue.FIFOTask {
228-
do {
229-
try await delivery.sendValue(operation(isolatedActor))
230-
} catch {
231-
await delivery.sendFailure(error)
232-
}
254+
await semaphore.wait()
255+
await delivery.execute({ @Sendable isolatedActor in
256+
do {
257+
try await delivery.sendValue(operation(isolatedActor))
258+
} catch {
259+
await delivery.sendFailure(error)
260+
}
261+
}, in: isolatedActor, priority: priority).value
233262
}
234263
fifoQueue.taskStreamContinuation.yield(task)
235264
self.init(priority: priority) {
236-
await task.sempahore.wait()
237-
return try await delivery.getValue()
265+
try await withTaskCancellationHandler(
266+
operation: {
267+
await semaphore.signal()
268+
return try await delivery.getValue()
269+
},
270+
onCancel: delivery.cancel
271+
)
238272
}
239273
}
240274
}

0 commit comments

Comments
 (0)