Skip to content

Commit 6744fd6

Browse files
committed
Merge pull request #36 from ReactKit/feature/thread-safety
Change behavior of 1st-resume.
2 parents 70527c7 + ea0ba9b commit 6744fd6

File tree

3 files changed

+111
-97
lines changed

3 files changed

+111
-97
lines changed

SwiftTask/SwiftTask.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ public class TaskConfiguration
3232
public var cancel: (Void -> Void)?
3333

3434
/// useful to terminate immediate-infinite-sequence while performing `initClosure`
35-
public private(set) var isFinished: Bool = false
36-
37-
public init()
35+
public var isFinished : Bool
3836
{
39-
37+
return self._isFinished.rawValue
4038
}
4139

40+
private var _isFinished = _Atomic(false)
41+
4242
internal func finish()
4343
{
4444
//
@@ -53,7 +53,7 @@ public class TaskConfiguration
5353
self.pause = nil
5454
self.resume = nil
5555
self.cancel = nil
56-
self.isFinished = true
56+
self._isFinished.rawValue = true
5757
}
5858
}
5959

@@ -220,7 +220,7 @@ public class Task<Progress, Value, Error>: Cancellable, Printable
220220
self._initClosure = _initClosure
221221

222222
// will be invoked on 1st resume (only once)
223-
self._machine.initResumeClosure = { [weak self] in
223+
self._machine.initResumeClosure.rawValue = { [weak self] in
224224

225225
// strongify `self` on 1st resume
226226
if let self_ = self {

SwiftTask/_Atomic.swift

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,38 +10,69 @@ import Darwin
1010

1111
internal final class _Atomic<T>
1212
{
13-
private var spinlock = OS_SPINLOCK_INIT
13+
private var _spinlock = OS_SPINLOCK_INIT
1414
private var _rawValue: T
1515

1616
internal var rawValue: T
1717
{
1818
get {
19-
lock()
19+
self._lock()
2020
let rawValue = self._rawValue
21-
unlock()
21+
self._unlock()
2222

2323
return rawValue
2424
}
2525

2626
set(newValue) {
27-
lock()
27+
self._lock()
2828
self._rawValue = newValue
29-
unlock()
29+
self._unlock()
3030
}
3131
}
3232

33-
init(_ rawValue: T)
33+
internal init(_ rawValue: T)
3434
{
3535
self._rawValue = rawValue
3636
}
3737

38-
private func lock()
38+
internal func update(f: T -> T) -> T
3939
{
40-
withUnsafeMutablePointer(&self.spinlock, OSSpinLockLock)
40+
self._lock()
41+
let oldValue = self._rawValue
42+
self._rawValue = f(oldValue)
43+
self._unlock()
44+
45+
return oldValue
46+
}
47+
48+
internal func tryUpdate(f: T -> (T, Bool)) -> (T, Bool)
49+
{
50+
self._lock()
51+
let oldValue = self._rawValue
52+
let (newValue, shouldUpdate) = f(oldValue)
53+
if shouldUpdate {
54+
self._rawValue = newValue
55+
}
56+
self._unlock()
57+
58+
return (oldValue, shouldUpdate)
4159
}
4260

43-
private func unlock()
61+
private func _lock()
62+
{
63+
withUnsafeMutablePointer(&self._spinlock, OSSpinLockLock)
64+
}
65+
66+
private func _unlock()
67+
{
68+
withUnsafeMutablePointer(&self._spinlock, OSSpinLockUnlock)
69+
}
70+
}
71+
72+
extension _Atomic: Printable
73+
{
74+
internal var description: String
4475
{
45-
withUnsafeMutablePointer(&self.spinlock, OSSpinLockUnlock)
76+
return toString(self.rawValue)
4677
}
4778
}

SwiftTask/_StateMachine.swift

Lines changed: 64 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ internal class _StateMachine<Progress, Value, Error>
3030

3131
/// wrapper closure for `_initClosure` to invoke only once when started `.Running`,
3232
/// and will be set to `nil` afterward
33-
internal var initResumeClosure: (Void -> Void)?
33+
internal var initResumeClosure: _Atomic<(Void -> Void)?> = _Atomic(nil)
3434

3535
private lazy var _progressTupleHandlers = _Handlers<ProgressTupleHandler>()
3636
private lazy var _completionHandlers = _Handlers<Void -> Void>()
3737

38-
private let _recursiveLock = _RecursiveLock()
38+
private var _lock = _RecursiveLock()
3939

4040
internal init(weakified: Bool, paused: Bool)
4141
{
@@ -45,63 +45,63 @@ internal class _StateMachine<Progress, Value, Error>
4545

4646
internal func addProgressTupleHandler(inout token: _HandlerToken?, _ progressTupleHandler: ProgressTupleHandler) -> Bool
4747
{
48-
self._recursiveLock.lock()
48+
self._lock.lock()
4949
if self.state.rawValue == .Running || self.state.rawValue == .Paused {
5050
token = self._progressTupleHandlers.append(progressTupleHandler)
51-
self._recursiveLock.unlock()
51+
self._lock.unlock()
5252
return token != nil
5353
}
5454
else {
55-
self._recursiveLock.unlock()
55+
self._lock.unlock()
5656
return false
5757
}
5858
}
5959

6060
internal func removeProgressTupleHandler(handlerToken: _HandlerToken?) -> Bool
6161
{
62-
self._recursiveLock.lock()
62+
self._lock.lock()
6363
if let handlerToken = handlerToken {
6464
let removedHandler = self._progressTupleHandlers.remove(handlerToken)
65-
self._recursiveLock.unlock()
65+
self._lock.unlock()
6666
return removedHandler != nil
6767
}
6868
else {
69-
self._recursiveLock.unlock()
69+
self._lock.unlock()
7070
return false
7171
}
7272
}
7373

7474
internal func addCompletionHandler(inout token: _HandlerToken?, _ completionHandler: Void -> Void) -> Bool
7575
{
76-
self._recursiveLock.lock()
76+
self._lock.lock()
7777
if self.state.rawValue == .Running || self.state.rawValue == .Paused {
7878
token = self._completionHandlers.append(completionHandler)
79-
self._recursiveLock.unlock()
79+
self._lock.unlock()
8080
return token != nil
8181
}
8282
else {
83-
self._recursiveLock.unlock()
83+
self._lock.unlock()
8484
return false
8585
}
8686
}
8787

8888
internal func removeCompletionHandler(handlerToken: _HandlerToken?) -> Bool
8989
{
90-
self._recursiveLock.lock()
90+
self._lock.lock()
9191
if let handlerToken = handlerToken {
9292
let removedHandler = self._completionHandlers.remove(handlerToken)
93-
self._recursiveLock.unlock()
93+
self._lock.unlock()
9494
return removedHandler != nil
9595
}
9696
else {
97-
self._recursiveLock.unlock()
97+
self._lock.unlock()
9898
return false
9999
}
100100
}
101101

102102
internal func handleProgress(progress: Progress)
103103
{
104-
self._recursiveLock.lock()
104+
self._lock.lock()
105105
if self.state.rawValue == .Running {
106106

107107
let oldProgress = self.progress.rawValue
@@ -114,110 +114,93 @@ internal class _StateMachine<Progress, Value, Error>
114114
for handler in self._progressTupleHandlers {
115115
handler(oldProgress: oldProgress, newProgress: progress)
116116
}
117-
self._recursiveLock.unlock()
117+
self._lock.unlock()
118118
}
119119
else {
120-
self._recursiveLock.unlock()
120+
self._lock.unlock()
121121
}
122122
}
123123

124124
internal func handleFulfill(value: Value)
125125
{
126-
self._recursiveLock.lock()
127-
if self.state.rawValue == .Running {
128-
self.state.rawValue = .Fulfilled
126+
self._lock.lock()
127+
let (_, updated) = self.state.tryUpdate { $0 == .Running ? (.Fulfilled, true) : ($0, false) }
128+
if updated {
129129
self.value.rawValue = value
130130
self._finish()
131-
self._recursiveLock.unlock()
131+
self._lock.unlock()
132132
}
133133
else {
134-
self._recursiveLock.unlock()
134+
self._lock.unlock()
135135
}
136136
}
137137

138138
internal func handleRejectInfo(errorInfo: ErrorInfo)
139139
{
140-
self._recursiveLock.lock()
141-
if self.state.rawValue == .Running || self.state.rawValue == .Paused {
142-
self.state.rawValue = errorInfo.isCancelled ? .Cancelled : .Rejected
140+
self._lock.lock()
141+
let toState = errorInfo.isCancelled ? TaskState.Cancelled : .Rejected
142+
let (_, updated) = self.state.tryUpdate { $0 == .Running || $0 == .Paused ? (toState, true) : ($0, false) }
143+
if updated {
143144
self.errorInfo.rawValue = errorInfo
144145
self._finish()
145-
self._recursiveLock.unlock()
146+
self._lock.unlock()
146147
}
147148
else {
148-
self._recursiveLock.unlock()
149+
self._lock.unlock()
149150
}
150151
}
151152

152153
internal func handlePause() -> Bool
153154
{
154-
self._recursiveLock.lock()
155-
if self.state.rawValue == .Running {
155+
self._lock.lock()
156+
let (_, updated) = self.state.tryUpdate { $0 == .Running ? (.Paused, true) : ($0, false) }
157+
if updated {
156158
self.configuration.pause?()
157-
self.state.rawValue = .Paused
158-
self._recursiveLock.unlock()
159+
self._lock.unlock()
159160
return true
160161
}
161162
else {
162-
self._recursiveLock.unlock()
163+
self._lock.unlock()
163164
return false
164165
}
165166
}
166167

167168
internal func handleResume() -> Bool
168169
{
169-
//
170-
// NOTE:
171-
// `initResumeClosure` should be invoked first before `configure.resume()`
172-
// to let downstream prepare setting upstream's progress/fulfill/reject handlers
173-
// before upstream actually starts sending values, which often happens
174-
// when downstream's `configure.resume()` is configured to call upstream's `task.resume()`
175-
// which eventually calls upstream's `initResumeClosure`
176-
// and thus upstream starts sending values.
177-
//
178-
179-
self._recursiveLock.lock()
180-
181-
self._handleInitResumeIfNeeded()
182-
let resumed = _handleResume()
183-
184-
self._recursiveLock.unlock()
185-
186-
return resumed
187-
}
188-
189-
///
190-
/// Invokes `initResumeClosure` on 1st resume (only once).
191-
///
192-
/// If initial state is `.Paused`, `state` will be temporarily switched to `.Running`
193-
/// during `initResumeClosure` execution, so that Task can call progress/fulfill/reject handlers safely.
194-
///
195-
private func _handleInitResumeIfNeeded()
196-
{
197-
if (self.initResumeClosure != nil) {
170+
self._lock.lock()
171+
if let initResumeClosure = self.initResumeClosure.update({ _ in nil }) {
198172

199-
let isInitPaused = (self.state.rawValue == .Paused)
200-
if isInitPaused {
201-
self.state.rawValue = .Running // switch `.Paused` => `.Resume` temporarily without invoking `configure.resume()`
202-
}
173+
self.state.rawValue = .Running
174+
self._lock.unlock()
203175

204-
// NOTE: performing `initResumeClosure` might change `state` to `.Fulfilled` or `.Rejected` **immediately**
205-
self.initResumeClosure?()
206-
self.initResumeClosure = nil
176+
//
177+
// NOTE:
178+
// Don't use `_lock` here so that dispatch_async'ed `handleProgress` inside `initResumeClosure()`
179+
// will be safely called even when current thread goes into sleep.
180+
//
181+
initResumeClosure()
207182

208-
// switch back to `.Paused` if temporary `.Running` has not changed
209-
// so that consecutive `_handleResume()` can perform `configure.resume()`
210-
if isInitPaused && self.state.rawValue == .Running {
211-
self.state.rawValue = .Paused
212-
}
183+
//
184+
// Comment-Out:
185+
// Don't call `configuration.resume()` when lazy starting.
186+
// This prevents inapropriate starting of upstream in ReactKit.
187+
//
188+
//self.configuration.resume?()
189+
190+
return true
191+
}
192+
else {
193+
let resumed = _handleResume()
194+
self._lock.unlock()
195+
return resumed
213196
}
214197
}
215198

216199
private func _handleResume() -> Bool
217200
{
218-
if self.state.rawValue == .Paused {
201+
let (_, updated) = self.state.tryUpdate { $0 == .Paused ? (.Running, true) : ($0, false) }
202+
if updated {
219203
self.configuration.resume?()
220-
self.state.rawValue = .Running
221204
return true
222205
}
223206
else {
@@ -227,16 +210,16 @@ internal class _StateMachine<Progress, Value, Error>
227210

228211
internal func handleCancel(error: Error? = nil) -> Bool
229212
{
230-
self._recursiveLock.lock()
231-
if self.state.rawValue == .Running || self.state.rawValue == .Paused {
232-
self.state.rawValue = .Cancelled
213+
self._lock.lock()
214+
let (_, updated) = self.state.tryUpdate { $0 == .Running || $0 == .Paused ? (.Cancelled, true) : ($0, false) }
215+
if updated {
233216
self.errorInfo.rawValue = ErrorInfo(error: error, isCancelled: true)
234217
self._finish()
235-
self._recursiveLock.unlock()
218+
self._lock.unlock()
236219
return true
237220
}
238221
else {
239-
self._recursiveLock.unlock()
222+
self._lock.unlock()
240223
return false
241224
}
242225
}
@@ -252,7 +235,7 @@ internal class _StateMachine<Progress, Value, Error>
252235

253236
self.configuration.finish()
254237

255-
self.initResumeClosure = nil
238+
self.initResumeClosure.rawValue = nil
256239
self.progress.rawValue = nil
257240
}
258241
}

0 commit comments

Comments
 (0)