Skip to content

Commit e462d04

Browse files
authored
Make Executors have deterministic shutdown (#5547)
In particular: * All tasks are guaranteed to either run to completion or not run at all; * Executors can now be destroyed from the threads they own (if applicable); and * Many common concepts between the Executor implementations have been harmonized and shared. Note: * This change does not yet implement `Executor::Dispose` or clean up any of the higher level componentry to take advantage of these changes. * The executors implement shutdown by canceling any tasks that haven't started yet. This diverges from the plan (which called for a separate `UserCallbackExecutor` to handle this), but in the end it turns out to be simpler to just cancel everything and make cancel handle tasks-in-progress in a reasonable way. LMK if you have grave objections to this, in which case I can rework this. This is the foundation for addressing firebase/quickstart-unity#638.
1 parent 4dcf659 commit e462d04

24 files changed

+1911
-1002
lines changed

Firestore/Example/Firestore.xcodeproj/project.pbxproj

Lines changed: 28 additions & 0 deletions
Large diffs are not rendered by default.

Firestore/core/src/remote/grpc_stream.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ void GrpcStream::Shutdown() {
193193
if (!completions_.empty() && !is_grpc_call_finished_) {
194194
// Important: during normal operation, the stream always has a pending read
195195
// operation, so `Shutdown` would hang indefinitely if we didn't cancel the
196-
// `context_`. However, if the stream has already failed, avoid canceling
196+
// `context_`. However, if the stream has already failed, avoid cancelling
197197
// the context to avoid overwriting the status captured during the
198198
// `OnOperationFailed`.
199199

@@ -279,7 +279,7 @@ bool GrpcStream::TryLastWrite(grpc::ByteBuffer&& message) {
279279
// (both with and without network connection), and never more than several
280280
// dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if
281281
// there happen to be circumstances under which the write may block
282-
// indefinitely (in that case, rely on the fact that canceling a gRPC call
282+
// indefinitely (in that case, rely on the fact that cancelling a gRPC call
283283
// makes all pending operations come back from the queue quickly).
284284

285285
auto status = completion->WaitUntilOffQueue(std::chrono::milliseconds(500));

Firestore/core/src/util/async_queue.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <utility>
2020

2121
#include "Firestore/core/src/util/hard_assert.h"
22+
#include "Firestore/core/src/util/task.h"
2223
#include "absl/algorithm/container.h"
2324
#include "absl/memory/memory.h"
2425

@@ -119,8 +120,8 @@ DelayedOperation AsyncQueue::EnqueueAfterDelay(Milliseconds delay,
119120
delay = Milliseconds(0);
120121
}
121122

122-
Executor::TaggedOperation tagged{static_cast<int>(timer_id), Wrap(operation)};
123-
return executor_->Schedule(delay, std::move(tagged));
123+
auto tag = static_cast<Executor::Tag>(timer_id);
124+
return executor_->Schedule(delay, tag, Wrap(operation));
124125
}
125126

126127
AsyncQueue::Operation AsyncQueue::Wrap(const Operation& operation) {
@@ -149,7 +150,7 @@ void AsyncQueue::EnqueueBlocking(const Operation& operation) {
149150
}
150151

151152
bool AsyncQueue::IsScheduled(const TimerId timer_id) const {
152-
return executor_->IsScheduled(static_cast<int>(timer_id));
153+
return executor_->IsTagScheduled(static_cast<int>(timer_id));
153154
}
154155

155156
void AsyncQueue::RunScheduledOperationsUntil(const TimerId last_timer_id) {
@@ -162,10 +163,13 @@ void AsyncQueue::RunScheduledOperationsUntil(const TimerId last_timer_id) {
162163
"Attempted to run scheduled operations until missing timer id: %s",
163164
last_timer_id);
164165

165-
for (auto next = executor_->PopFromSchedule(); next.has_value();
166+
for (auto* next = executor_->PopFromSchedule(); next != nullptr;
166167
next = executor_->PopFromSchedule()) {
167-
next->operation();
168-
if (next->tag == static_cast<int>(last_timer_id)) {
168+
// `ExecuteAndRelease` can delete the `Task` so read the tag first.
169+
bool found_tag = next->tag() == static_cast<int>(last_timer_id);
170+
171+
next->ExecuteAndRelease();
172+
if (found_tag) {
169173
break;
170174
}
171175
}

Firestore/core/src/util/async_queue.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class AsyncQueue : public std::enable_shared_from_this<AsyncQueue> {
112112
// Like `Enqueue`, but also starts the shutdown process. Once the shutdown
113113
// process has started, calling any Enqueue* methods becomes a no-op
114114
//
115-
// The exception is `EnqueueEvenAfterShutdown`, operations requsted via
115+
// The exception is `EnqueueEvenAfterShutdown`, operations requested via
116116
// this will still be scheduled.
117117
void EnqueueAndInitiateShutdown(const Operation& operation);
118118

Firestore/core/src/util/config.h.in

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 Google
2+
* Copyright 2018 Google LLC
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,11 @@
3030
# define HAVE_ARC4RANDOM 1
3131
#endif
3232

33+
#cmakedefine HAVE_LIBDISPATCH 1
34+
#if COCOAPODS
35+
# define HAVE_LIBDISPATCH 1
36+
#endif
37+
3338
#cmakedefine HAVE_OPENSSL_RAND_H 1
3439

3540
#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_CONFIG_H_

Firestore/core/src/util/executor.h

Lines changed: 82 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 Google
2+
* Copyright 2018 Google LLC
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,46 +21,13 @@
2121
#include <functional>
2222
#include <memory>
2323
#include <string>
24-
#include <utility>
25-
26-
#include "absl/types/optional.h"
2724

2825
namespace firebase {
2926
namespace firestore {
3027
namespace util {
3128

32-
// A handle to an operation scheduled for future execution. The handle may
33-
// outlive the operation, but it *cannot* outlive the executor that created it.
34-
class DelayedOperation {
35-
public:
36-
// Creates an empty `DelayedOperation` not associated with any actual
37-
// operation. Calling `Cancel` on it is a no-op.
38-
DelayedOperation() {
39-
}
40-
41-
// Returns whether this `DelayedOperation` is associated with an actual
42-
// operation.
43-
explicit operator bool() const {
44-
return static_cast<bool>(cancel_func_);
45-
}
46-
47-
// If the operation has not been run yet, cancels the operation. Otherwise,
48-
// this function is a no-op.
49-
void Cancel() {
50-
if (cancel_func_) {
51-
cancel_func_();
52-
cancel_func_ = {};
53-
}
54-
}
55-
56-
// Internal use only.
57-
explicit DelayedOperation(std::function<void()>&& cancel_func)
58-
: cancel_func_{std::move(cancel_func)} {
59-
}
60-
61-
private:
62-
std::function<void()> cancel_func_;
63-
};
29+
class DelayedOperation;
30+
class Task;
6431

6532
// An interface to a platform-specific executor of asynchronous operations
6633
// (called tasks on other platforms).
@@ -71,25 +38,23 @@ class DelayedOperation {
7138
// The operations are executed sequentially; only a single operation is executed
7239
// at any given time.
7340
//
74-
// Delayed operations may be canceled if they have not already been run.
41+
// Delayed operations may be cancelled if they have not already been run.
7542
class Executor {
7643
public:
44+
// An opaque name for a kind of operation. All operations of the same type
45+
// should share a tag.
7746
using Tag = int;
47+
static constexpr Tag kNoTag = -1;
48+
49+
// An opaque, monotonically increasing identifier for each operation that does
50+
// not depend on its address. Whereas the `Tag` identifies the kind of
51+
// operation, the `Id` identifies the specific instance.
52+
using Id = uint32_t;
7853
using Operation = std::function<void()>;
79-
using Milliseconds = std::chrono::milliseconds;
8054

81-
// Operations scheduled for future execution have an opaque tag. The value of
82-
// the tag is ignored by the executor but can be used to find operations with
83-
// a given tag after they are scheduled.
84-
struct TaggedOperation {
85-
TaggedOperation() {
86-
}
87-
TaggedOperation(const Tag tag, Operation&& operation)
88-
: tag{tag}, operation{std::move(operation)} {
89-
}
90-
Tag tag = 0;
91-
Operation operation;
92-
};
55+
using Milliseconds = std::chrono::milliseconds;
56+
using Clock = std::chrono::steady_clock;
57+
using TimePoint = std::chrono::time_point<Clock, Milliseconds>;
9358

9459
// Creates a new serial Executor of the platform-appropriate type, and gives
9560
// it the given label, if the implementation supports it.
@@ -113,15 +78,20 @@ class Executor {
11378
// Like `Execute`, but blocks until the `operation` finishes, consequently
11479
// draining immediate operations from the executor.
11580
virtual void ExecuteBlocking(Operation&& operation) = 0;
81+
11682
// Scheduled the given `operation` to be executed after `delay` milliseconds
11783
// from now, and returns a handle that allows to cancel the operation
118-
// (provided it hasn't been run already). The operation is tagged to allow
119-
// retrieving it later.
84+
// (provided it hasn't been run already).
85+
//
86+
// Operations scheduled for future execution have an opaque tag. The value of
87+
// the tag is ignored by the executor but can be used to find operations with
88+
// a given tag after they are scheduled.
12089
//
12190
// `delay` must be non-negative; use `Execute` to schedule operations for
12291
// immediate execution.
12392
virtual DelayedOperation Schedule(Milliseconds delay,
124-
TaggedOperation&& operation) = 0;
93+
Tag tag,
94+
Operation&& operation) = 0;
12595

12696
// Checks for the caller whether it is being invoked by this executor.
12797
virtual bool IsCurrentExecutor() const = 0;
@@ -135,13 +105,66 @@ class Executor {
135105

136106
// Checks whether an operation tagged with the given `tag` is currently
137107
// scheduled for future execution.
138-
virtual bool IsScheduled(Tag tag) const = 0;
108+
virtual bool IsTagScheduled(Tag tag) const = 0;
109+
virtual bool IsIdScheduled(Id id) const = 0;
110+
139111
// Removes the nearest due scheduled operation from the schedule and returns
140-
// it to the caller. This function may be used to reschedule operations.
141-
// Immediate operations don't count; only operations scheduled for delayed
142-
// execution may be removed. If no such operations are currently scheduled, an
143-
// empty `optional` is returned.
144-
virtual absl::optional<TaggedOperation> PopFromSchedule() = 0;
112+
// it to the caller.
113+
//
114+
// Only operations scheduled for delayed execution can be removed with this
115+
// method; immediate operations don't count. If no such operations are
116+
// currently scheduled, `nullptr` is returned.
117+
//
118+
// The caller is responsible for either executing or cancelling (and
119+
// releasing) the returned Task.
120+
virtual Task* PopFromSchedule() = 0;
121+
122+
private:
123+
// Mark a task completed, removing it from any internal schedule or tracking.
124+
// Called by Task once it has completed execution.
125+
virtual void OnCompletion(Task* task) = 0;
126+
friend class Task;
127+
128+
// If the operation hasn't yet been run, it will be removed from the queue.
129+
// Otherwise, this function is a no-op.
130+
//
131+
// Called by `DelayedOperation` when its user calls `Cancel`. Implementations
132+
// of `Cancel` should also `Dispose` the underlying `Task` to actually prevent
133+
// execution.
134+
virtual void Cancel(Id operation_id) = 0;
135+
friend class DelayedOperation;
136+
};
137+
138+
// A handle to an operation scheduled for future execution. The handle may
139+
// outlive the operation, but it *cannot* outlive the executor that created it.
140+
class DelayedOperation {
141+
public:
142+
// Creates an empty `DelayedOperation` not associated with any actual
143+
// operation. Calling `Cancel` on it is a no-op.
144+
DelayedOperation() = default;
145+
146+
// Returns whether this `DelayedOperation` is associated with an actual
147+
// operation.
148+
explicit operator bool() const {
149+
return executor_ && executor_->IsIdScheduled(id_);
150+
}
151+
152+
// If the operation has not been run yet, cancels the operation. Otherwise,
153+
// this function is a no-op.
154+
void Cancel() {
155+
if (executor_) {
156+
executor_->Cancel(id_);
157+
}
158+
}
159+
160+
// Internal use only.
161+
explicit DelayedOperation(Executor* executor, Executor::Id id)
162+
: executor_(executor), id_(id) {
163+
}
164+
165+
private:
166+
Executor* executor_ = nullptr;
167+
Executor::Id id_ = 0;
145168
};
146169

147170
} // namespace util

0 commit comments

Comments
 (0)