Skip to content

Commit 851b47f

Browse files
committed
Concurrency: Factor atomic operations in Task.sleep() into a Sendable wrapper.
When building the Swift standard library, the compiler warns: ``` warning: capture of 'wordPtr' with non-sendable type 'UnsafeMutablePointer<Builtin.Word>' in a `@Sendable` closure ``` This diagnostic will become an error in the Swift 6 language mode, so it needs to be addressed. This PR factors the atomic operations used by the implementation of `Task.sleep()` into an `@unchecked Sendable` wrapper in order to convince the compiler that these operations are thread-safe. As an added benefit, the code is more readable when the atomic operatios are abstracted away. This refactor intentionally translates the existing implementation into a wrapper as faithfully as possible and does not attempt to improve on any other aspects of the implementation, such as the unsafe manual memory allocation and deallocation.
1 parent a59de54 commit 851b47f

File tree

2 files changed

+67
-69
lines changed

2 files changed

+67
-69
lines changed

stdlib/public/Concurrency/TaskSleep.swift

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -105,25 +105,55 @@ extension Task where Success == Never, Failure == Never {
105105
}
106106
}
107107

108+
/// A simple wrapper for a pointer to heap allocated storage of a `SleepState`
109+
/// value. This wrapper is `Sendable` because it facilitates atomic load and
110+
/// exchange operations on the underlying storage. However, this wrapper is also
111+
/// _unsafe_ because the owner must manually deallocate the token once it is no
112+
/// longer needed.
113+
struct UnsafeSleepStateToken: @unchecked Sendable {
114+
let wordPtr: UnsafeMutablePointer<Builtin.Word>
115+
116+
/// Allocates the underlying storage and sets the value to `.notStarted`.
117+
init() {
118+
wordPtr = .allocate(capacity: 1)
119+
Builtin.atomicstore_seqcst_Word(
120+
wordPtr._rawValue, SleepState.notStarted.word._builtinWordValue)
121+
}
122+
123+
/// Atomically loads the current state.
124+
func load() -> SleepState {
125+
return SleepState(word: Builtin.atomicload_seqcst_Word(wordPtr._rawValue))
126+
}
127+
128+
/// Attempts to atomically set the stored value to `desired` if the current
129+
/// value is equal to `expected`. Returns true if the exchange was successful.
130+
func exchange(expected: SleepState, desired: SleepState) -> Bool {
131+
let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word(
132+
wordPtr._rawValue,
133+
expected.word._builtinWordValue,
134+
desired.word._builtinWordValue)
135+
return Bool(_builtinBooleanLiteral: won)
136+
}
137+
138+
/// Deallocates the underlying storage.
139+
func deallocate() {
140+
wordPtr.deallocate()
141+
}
142+
}
143+
108144
/// Called when the sleep(nanoseconds:) operation woke up without being
109145
/// canceled.
110-
static func onSleepWake(
111-
_ wordPtr: UnsafeMutablePointer<Builtin.Word>
112-
) {
146+
static func onSleepWake(_ token: UnsafeSleepStateToken) {
113147
while true {
114-
let state = SleepState(loading: wordPtr)
148+
let state = token.load()
115149
switch state {
116150
case .notStarted:
117151
fatalError("Cannot wake before we even started")
118152

119153
case .activeContinuation(let continuation):
120154
// We have an active continuation, so try to transition to the
121155
// "finished" state.
122-
let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word(
123-
wordPtr._rawValue,
124-
state.word._builtinWordValue,
125-
SleepState.finished.word._builtinWordValue)
126-
if Bool(_builtinBooleanLiteral: won) {
156+
if token.exchange(expected: state, desired: .finished) {
127157
// The sleep finished, so invoke the continuation: we're done.
128158
continuation.resume()
129159
return
@@ -137,9 +167,9 @@ extension Task where Success == Never, Failure == Never {
137167

138168
case .cancelled:
139169
// The task was cancelled, which means the continuation was
140-
// called by the cancellation handler. We need to deallocate the flag
141-
// word, because it was left over for this task to complete.
142-
wordPtr.deallocate()
170+
// called by the cancellation handler. We need to deallocate the token
171+
// because it was left over for this task to complete.
172+
token.deallocate()
143173
return
144174

145175
case .cancelledBeforeStarted:
@@ -151,20 +181,14 @@ extension Task where Success == Never, Failure == Never {
151181

152182
/// Called when the sleep(nanoseconds:) operation has been canceled before
153183
/// the sleep completed.
154-
static func onSleepCancel(
155-
_ wordPtr: UnsafeMutablePointer<Builtin.Word>
156-
) {
184+
static func onSleepCancel(_ token: UnsafeSleepStateToken) {
157185
while true {
158-
let state = SleepState(loading: wordPtr)
186+
let state = token.load()
159187
switch state {
160188
case .notStarted:
161189
// We haven't started yet, so try to transition to the cancelled-before
162190
// started state.
163-
let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word(
164-
wordPtr._rawValue,
165-
state.word._builtinWordValue,
166-
SleepState.cancelledBeforeStarted.word._builtinWordValue)
167-
if Bool(_builtinBooleanLiteral: won) {
191+
if token.exchange(expected: state, desired: .cancelledBeforeStarted) {
168192
return
169193
}
170194

@@ -174,11 +198,7 @@ extension Task where Success == Never, Failure == Never {
174198
case .activeContinuation(let continuation):
175199
// We have an active continuation, so try to transition to the
176200
// "cancelled" state.
177-
let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word(
178-
wordPtr._rawValue,
179-
state.word._builtinWordValue,
180-
SleepState.cancelled.word._builtinWordValue)
181-
if Bool(_builtinBooleanLiteral: won) {
201+
if token.exchange(expected: state, desired: .cancelled) {
182202
// We recorded the task cancellation before the sleep finished, so
183203
// invoke the continuation with the cancellation error.
184204
continuation.resume(throwing: _Concurrency.CancellationError())
@@ -203,33 +223,22 @@ extension Task where Success == Never, Failure == Never {
203223
///
204224
/// This function doesn't block the underlying thread.
205225
public static func sleep(nanoseconds duration: UInt64) async throws {
206-
// Allocate storage for the storage word.
207-
let wordPtr = UnsafeMutablePointer<Builtin.Word>.allocate(capacity: 1)
208-
209-
// Initialize the flag word to "not started", which means the continuation
210-
// has neither been created nor completed.
211-
Builtin.atomicstore_seqcst_Word(
212-
wordPtr._rawValue, SleepState.notStarted.word._builtinWordValue)
226+
// Create a token which will initially have the value "not started", which
227+
// means the continuation has neither been created nor completed.
228+
let token = UnsafeSleepStateToken()
213229

214230
do {
215231
// Install a cancellation handler to resume the continuation by
216232
// throwing CancellationError.
217233
try await withTaskCancellationHandler {
218234
let _: () = try await withUnsafeThrowingContinuation { continuation in
219235
while true {
220-
let state = SleepState(loading: wordPtr)
236+
let state = token.load()
221237
switch state {
222238
case .notStarted:
223-
// The word that describes the active continuation state.
224-
let continuationWord =
225-
SleepState.activeContinuation(continuation).word
226-
227-
// Try to swap in the continuation word.
228-
let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word(
229-
wordPtr._rawValue,
230-
state.word._builtinWordValue,
231-
continuationWord._builtinWordValue)
232-
if !Bool(_builtinBooleanLiteral: won) {
239+
// Try to swap in the continuation state.
240+
let newState = SleepState.activeContinuation(continuation)
241+
if !token.exchange(expected: state, desired: newState) {
233242
// Keep trying!
234243
continue
235244
}
@@ -243,7 +252,7 @@ extension Task where Success == Never, Failure == Never {
243252
addPendingGroupTaskUnconditionally: false,
244253
isDiscardingTask: false)
245254
let (sleepTask, _) = Builtin.createAsyncTask(sleepTaskFlags) {
246-
onSleepWake(wordPtr)
255+
onSleepWake(token)
247256
}
248257
_enqueueJobGlobalWithDelay(
249258
duration, Builtin.convertTaskToJob(sleepTask))
@@ -264,12 +273,12 @@ extension Task where Success == Never, Failure == Never {
264273
}
265274
}
266275
} onCancel: {
267-
onSleepCancel(wordPtr)
276+
onSleepCancel(token)
268277
}
269278

270279
// Determine whether we got cancelled before we even started.
271280
let cancelledBeforeStarted: Bool
272-
switch SleepState(loading: wordPtr) {
281+
switch token.load() {
273282
case .notStarted, .activeContinuation, .cancelled:
274283
fatalError("Invalid state for non-cancelled sleep task")
275284

@@ -282,7 +291,7 @@ extension Task where Success == Never, Failure == Never {
282291

283292
// We got here without being cancelled, so deallocate the storage for
284293
// the flag word and continuation.
285-
wordPtr.deallocate()
294+
token.deallocate()
286295

287296
// If we got cancelled before we even started, through the cancellation
288297
// error now.

stdlib/public/Concurrency/TaskSleepDuration.swift

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,33 +21,22 @@ extension Task where Success == Never, Failure == Never {
2121
tolerance: Duration?,
2222
clock: _ClockID
2323
) async throws {
24-
// Allocate storage for the storage word.
25-
let wordPtr = UnsafeMutablePointer<Builtin.Word>.allocate(capacity: 1)
26-
27-
// Initialize the flag word to "not started", which means the continuation
28-
// has neither been created nor completed.
29-
Builtin.atomicstore_seqcst_Word(
30-
wordPtr._rawValue, SleepState.notStarted.word._builtinWordValue)
24+
// Create a token which will initially have the value "not started", which
25+
// means the continuation has neither been created nor completed.
26+
let token = UnsafeSleepStateToken()
3127

3228
do {
3329
// Install a cancellation handler to resume the continuation by
3430
// throwing CancellationError.
3531
try await withTaskCancellationHandler {
3632
let _: () = try await withUnsafeThrowingContinuation { continuation in
3733
while true {
38-
let state = SleepState(loading: wordPtr)
34+
let state = token.load()
3935
switch state {
4036
case .notStarted:
41-
// The word that describes the active continuation state.
42-
let continuationWord =
43-
SleepState.activeContinuation(continuation).word
44-
4537
// Try to swap in the continuation word.
46-
let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word(
47-
wordPtr._rawValue,
48-
state.word._builtinWordValue,
49-
continuationWord._builtinWordValue)
50-
if !Bool(_builtinBooleanLiteral: won) {
38+
let newState = SleepState.activeContinuation(continuation)
39+
if !token.exchange(expected: state, desired: newState) {
5140
// Keep trying!
5241
continue
5342
}
@@ -61,7 +50,7 @@ extension Task where Success == Never, Failure == Never {
6150
addPendingGroupTaskUnconditionally: false,
6251
isDiscardingTask: false)
6352
let (sleepTask, _) = Builtin.createAsyncTask(sleepTaskFlags) {
64-
onSleepWake(wordPtr)
53+
onSleepWake(token)
6554
}
6655
let toleranceSeconds: Int64
6756
let toleranceNanoseconds: Int64
@@ -94,12 +83,12 @@ extension Task where Success == Never, Failure == Never {
9483
}
9584
}
9685
} onCancel: {
97-
onSleepCancel(wordPtr)
86+
onSleepCancel(token)
9887
}
9988

10089
// Determine whether we got cancelled before we even started.
10190
let cancelledBeforeStarted: Bool
102-
switch SleepState(loading: wordPtr) {
91+
switch token.load() {
10392
case .notStarted, .activeContinuation, .cancelled:
10493
fatalError("Invalid state for non-cancelled sleep task")
10594

@@ -112,7 +101,7 @@ extension Task where Success == Never, Failure == Never {
112101

113102
// We got here without being cancelled, so deallocate the storage for
114103
// the flag word and continuation.
115-
wordPtr.deallocate()
104+
token.deallocate()
116105

117106
// If we got cancelled before we even started, through the cancellation
118107
// error now.

0 commit comments

Comments
 (0)