Skip to content

Commit bf20e31

Browse files
committed
[lldb] Add timed callbacks to the MainLoop class
The motivating use case is being able to "time out" certain operations (by adding a timed callback which will force the termination of the loop), but the design is flexible enough to accomodate other use cases as well (e.g. running a periodic task in the background). The implementation builds on the existing "pending callback" mechanism, by associating a time point with each callback -- every time the loop wakes up, it runs all of the callbacks which are past their point, and it also makes sure to sleep only until the next callback is scheduled to run. I've done some renaming as names like "TriggerPendingCallbacks" were no longer accurate -- the function may no longer cause any callbacks to be called (it may just cause the main loop thread to recalculate the time it wants to sleep).
1 parent 222f6af commit bf20e31

File tree

7 files changed

+194
-68
lines changed

7 files changed

+194
-68
lines changed

lldb/include/lldb/Host/MainLoopBase.h

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
#include "lldb/Utility/Status.h"
1414
#include "llvm/ADT/DenseMap.h"
1515
#include "llvm/Support/ErrorHandling.h"
16+
#include <chrono>
1617
#include <functional>
1718
#include <mutex>
19+
#include <queue>
1820

1921
namespace lldb_private {
2022

@@ -38,6 +40,9 @@ class MainLoopBase {
3840
class ReadHandle;
3941

4042
public:
43+
using TimePoint = std::chrono::time_point<std::chrono::steady_clock,
44+
std::chrono::nanoseconds>;
45+
4146
MainLoopBase() : m_terminate_request(false) {}
4247
virtual ~MainLoopBase() = default;
4348

@@ -52,7 +57,18 @@ class MainLoopBase {
5257
// Add a pending callback that will be executed once after all the pending
5358
// events are processed. The callback will be executed even if termination
5459
// was requested.
55-
void AddPendingCallback(const Callback &callback);
60+
void AddPendingCallback(const Callback &callback) {
61+
AddCallback(callback, std::chrono::steady_clock::time_point());
62+
}
63+
64+
// Add a callback that will be executed after a certain amount of time has
65+
// passed.
66+
void AddCallback(const Callback &callback, std::chrono::nanoseconds delay) {
67+
AddCallback(callback, std::chrono::steady_clock::now() + delay);
68+
}
69+
70+
// Add a callback that will be executed after a given point in time.
71+
void AddCallback(const Callback &callback, TimePoint point);
5672

5773
// Waits for registered events and invoke the proper callbacks. Returns when
5874
// all callbacks deregister themselves or when someone requests termination.
@@ -69,14 +85,18 @@ class MainLoopBase {
6985

7086
virtual void UnregisterReadObject(IOObject::WaitableHandle handle) = 0;
7187

72-
// Interrupt the loop that is currently waiting for events and execute
73-
// the current pending callbacks immediately.
74-
virtual void TriggerPendingCallbacks() = 0;
88+
// Interrupt the loop that is currently waiting for events.
89+
virtual void Interrupt() = 0;
90+
91+
void ProcessCallbacks();
7592

76-
void ProcessPendingCallbacks();
93+
std::optional<TimePoint> GetNextWakeupTime();
7794

7895
std::mutex m_callback_mutex;
79-
std::vector<Callback> m_pending_callbacks;
96+
std::priority_queue<std::pair<TimePoint, Callback>,
97+
std::vector<std::pair<TimePoint, Callback>>,
98+
llvm::on_first<std::greater<TimePoint>>>
99+
m_callbacks;
80100
bool m_terminate_request : 1;
81101

82102
private:

lldb/include/lldb/Host/posix/MainLoopPosix.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class MainLoopPosix : public MainLoopBase {
5454
void UnregisterReadObject(IOObject::WaitableHandle handle) override;
5555
void UnregisterSignal(int signo, std::list<Callback>::iterator callback_it);
5656

57-
void TriggerPendingCallbacks() override;
57+
void Interrupt() override;
5858

5959
private:
6060
void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -88,8 +88,8 @@ class MainLoopPosix : public MainLoopBase {
8888

8989
llvm::DenseMap<IOObject::WaitableHandle, Callback> m_read_fds;
9090
llvm::DenseMap<int, SignalInfo> m_signals;
91-
Pipe m_trigger_pipe;
92-
std::atomic<bool> m_triggering;
91+
Pipe m_interrupt_pipe;
92+
std::atomic<bool> m_interrupting = false;
9393
#if HAVE_SYS_EVENT_H
9494
int m_kqueue;
9595
#endif

lldb/include/lldb/Host/windows/MainLoopWindows.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class MainLoopWindows : public MainLoopBase {
3434
protected:
3535
void UnregisterReadObject(IOObject::WaitableHandle handle) override;
3636

37-
void TriggerPendingCallbacks() override;
37+
void Interrupt() override;
3838

3939
private:
4040
void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -45,7 +45,7 @@ class MainLoopWindows : public MainLoopBase {
4545
Callback callback;
4646
};
4747
llvm::DenseMap<IOObject::WaitableHandle, FdInfo> m_read_fds;
48-
void *m_trigger_event;
48+
void *m_interrupt_event;
4949
};
5050

5151
} // namespace lldb_private

lldb/source/Host/common/MainLoopBase.cpp

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,43 @@
77
//===----------------------------------------------------------------------===//
88

99
#include "lldb/Host/MainLoopBase.h"
10+
#include <chrono>
1011

1112
using namespace lldb;
1213
using namespace lldb_private;
1314

14-
void MainLoopBase::AddPendingCallback(const Callback &callback) {
15+
void MainLoopBase::AddCallback(const Callback &callback, TimePoint point) {
16+
bool interrupt_needed;
1517
{
1618
std::lock_guard<std::mutex> lock{m_callback_mutex};
17-
m_pending_callbacks.push_back(callback);
19+
// We need to interrupt the main thread if this callback is scheduled to
20+
// execute at an earlier time than the earliest callback registered so far.
21+
interrupt_needed = m_callbacks.empty() || point < m_callbacks.top().first;
22+
m_callbacks.emplace(point, callback);
1823
}
19-
TriggerPendingCallbacks();
24+
if (interrupt_needed)
25+
Interrupt();
2026
}
2127

22-
void MainLoopBase::ProcessPendingCallbacks() {
23-
// Move the callbacks to a local vector to avoid keeping m_pending_callbacks
24-
// locked throughout the calls.
25-
std::vector<Callback> pending_callbacks;
26-
{
27-
std::lock_guard<std::mutex> lock{m_callback_mutex};
28-
pending_callbacks = std::move(m_pending_callbacks);
29-
}
28+
void MainLoopBase::ProcessCallbacks() {
29+
while (true) {
30+
Callback callback;
31+
{
32+
std::lock_guard<std::mutex> lock{m_callback_mutex};
33+
if (m_callbacks.empty() ||
34+
std::chrono::steady_clock::now() < m_callbacks.top().first)
35+
return;
36+
callback = std::move(m_callbacks.top().second);
37+
m_callbacks.pop();
38+
}
3039

31-
for (const Callback &callback : pending_callbacks)
3240
callback(*this);
41+
}
42+
}
43+
44+
std::optional<MainLoopBase::TimePoint> MainLoopBase::GetNextWakeupTime() {
45+
std::lock_guard<std::mutex> lock(m_callback_mutex);
46+
if (m_callbacks.empty())
47+
return std::nullopt;
48+
return m_callbacks.top().first;
3349
}

lldb/source/Host/posix/MainLoopPosix.cpp

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <algorithm>
1616
#include <cassert>
1717
#include <cerrno>
18+
#include <chrono>
1819
#include <csignal>
1920
#include <ctime>
2021
#include <fcntl.h>
@@ -67,6 +68,30 @@ static void SignalHandler(int signo, siginfo_t *info, void *) {
6768
assert(bytes_written == 1 || (bytes_written == -1 && errno == EAGAIN));
6869
}
6970

71+
class ToTimeSpec {
72+
public:
73+
explicit ToTimeSpec(std::optional<MainLoopPosix::TimePoint> point) {
74+
using namespace std::chrono;
75+
76+
if (!point) {
77+
m_ts_ptr = nullptr;
78+
return;
79+
}
80+
nanoseconds dur = std::max(*point - steady_clock::now(), nanoseconds(0));
81+
m_ts_ptr = &m_ts;
82+
m_ts.tv_sec = duration_cast<seconds>(dur).count();
83+
m_ts.tv_nsec = (dur % seconds(1)).count();
84+
}
85+
ToTimeSpec(const ToTimeSpec &) = delete;
86+
ToTimeSpec &operator=(const ToTimeSpec &) = delete;
87+
88+
operator struct timespec *() { return m_ts_ptr; }
89+
90+
private:
91+
struct timespec m_ts;
92+
struct timespec *m_ts_ptr;
93+
};
94+
7095
class MainLoopPosix::RunImpl {
7196
public:
7297
RunImpl(MainLoopPosix &loop);
@@ -99,8 +124,9 @@ Status MainLoopPosix::RunImpl::Poll() {
99124
for (auto &fd : loop.m_read_fds)
100125
EV_SET(&in_events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0);
101126

102-
num_events = kevent(loop.m_kqueue, in_events.data(), in_events.size(),
103-
out_events, std::size(out_events), nullptr);
127+
num_events =
128+
kevent(loop.m_kqueue, in_events.data(), in_events.size(), out_events,
129+
std::size(out_events), ToTimeSpec(loop.GetNextWakeupTime()));
104130

105131
if (num_events < 0) {
106132
if (errno == EINTR) {
@@ -144,7 +170,7 @@ Status MainLoopPosix::RunImpl::Poll() {
144170
}
145171

146172
if (ppoll(read_fds.data(), read_fds.size(),
147-
/*timeout=*/nullptr,
173+
ToTimeSpec(loop.GetNextWakeupTime()),
148174
/*sigmask=*/nullptr) == -1 &&
149175
errno != EINTR)
150176
return Status(errno, eErrorTypePOSIX);
@@ -165,27 +191,28 @@ void MainLoopPosix::RunImpl::ProcessReadEvents() {
165191
}
166192
#endif
167193

168-
MainLoopPosix::MainLoopPosix() : m_triggering(false) {
169-
Status error = m_trigger_pipe.CreateNew(/*child_process_inherit=*/false);
194+
MainLoopPosix::MainLoopPosix() {
195+
Status error = m_interrupt_pipe.CreateNew(/*child_process_inherit=*/false);
170196
assert(error.Success());
171197

172198
// Make the write end of the pipe non-blocking.
173-
int result = fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_SETFL,
174-
fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_GETFL) |
199+
int result = fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_SETFL,
200+
fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_GETFL) |
175201
O_NONBLOCK);
176202
assert(result == 0);
177203
UNUSED_IF_ASSERT_DISABLED(result);
178204

179-
const int trigger_pipe_fd = m_trigger_pipe.GetReadFileDescriptor();
180-
m_read_fds.insert({trigger_pipe_fd, [trigger_pipe_fd](MainLoopBase &loop) {
181-
char c;
182-
ssize_t bytes_read = llvm::sys::RetryAfterSignal(
183-
-1, ::read, trigger_pipe_fd, &c, 1);
184-
assert(bytes_read == 1);
185-
UNUSED_IF_ASSERT_DISABLED(bytes_read);
186-
// NB: This implicitly causes another loop iteration
187-
// and therefore the execution of pending callbacks.
188-
}});
205+
const int interrupt_pipe_fd = m_interrupt_pipe.GetReadFileDescriptor();
206+
m_read_fds.insert(
207+
{interrupt_pipe_fd, [interrupt_pipe_fd](MainLoopBase &loop) {
208+
char c;
209+
ssize_t bytes_read =
210+
llvm::sys::RetryAfterSignal(-1, ::read, interrupt_pipe_fd, &c, 1);
211+
assert(bytes_read == 1);
212+
UNUSED_IF_ASSERT_DISABLED(bytes_read);
213+
// NB: This implicitly causes another loop iteration
214+
// and therefore the execution of pending callbacks.
215+
}});
189216
#if HAVE_SYS_EVENT_H
190217
m_kqueue = kqueue();
191218
assert(m_kqueue >= 0);
@@ -196,8 +223,8 @@ MainLoopPosix::~MainLoopPosix() {
196223
#if HAVE_SYS_EVENT_H
197224
close(m_kqueue);
198225
#endif
199-
m_read_fds.erase(m_trigger_pipe.GetReadFileDescriptor());
200-
m_trigger_pipe.Close();
226+
m_read_fds.erase(m_interrupt_pipe.GetReadFileDescriptor());
227+
m_interrupt_pipe.Close();
201228
assert(m_read_fds.size() == 0);
202229
assert(m_signals.size() == 0);
203230
}
@@ -244,11 +271,9 @@ MainLoopPosix::RegisterSignal(int signo, const Callback &callback,
244271
sigset_t old_set;
245272

246273
// Set signal info before installing the signal handler!
247-
g_signal_info[signo].pipe_fd = m_trigger_pipe.GetWriteFileDescriptor();
274+
g_signal_info[signo].pipe_fd = m_interrupt_pipe.GetWriteFileDescriptor();
248275
g_signal_info[signo].flag = 0;
249276

250-
// Even if using kqueue, the signal handler will still be invoked, so it's
251-
// important to replace it with our "benign" handler.
252277
int ret = sigaction(signo, &new_action, &info.old_action);
253278
UNUSED_IF_ASSERT_DISABLED(ret);
254279
assert(ret == 0 && "sigaction failed");
@@ -307,8 +332,8 @@ Status MainLoopPosix::Run() {
307332

308333
ProcessSignals();
309334

310-
m_triggering = false;
311-
ProcessPendingCallbacks();
335+
m_interrupting = false;
336+
ProcessCallbacks();
312337
}
313338
return Status();
314339
}
@@ -346,13 +371,13 @@ void MainLoopPosix::ProcessSignal(int signo) {
346371
}
347372
}
348373

349-
void MainLoopPosix::TriggerPendingCallbacks() {
350-
if (m_triggering.exchange(true))
374+
void MainLoopPosix::Interrupt() {
375+
if (m_interrupting.exchange(true))
351376
return;
352377

353378
char c = '.';
354379
size_t bytes_written;
355-
Status error = m_trigger_pipe.Write(&c, 1, bytes_written);
380+
Status error = m_interrupt_pipe.Write(&c, 1, bytes_written);
356381
assert(error.Success());
357382
UNUSED_IF_ASSERT_DISABLED(error);
358383
assert(bytes_written == 1);

lldb/source/Host/windows/MainLoopWindows.cpp

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,24 @@
2121
using namespace lldb;
2222
using namespace lldb_private;
2323

24+
static DWORD ToTimeout(std::optional<MainLoopWindows::TimePoint> point) {
25+
using namespace std::chrono;
26+
27+
if (!point)
28+
return WSA_INFINITE;
29+
30+
nanoseconds dur = (std::max)(*point - steady_clock::now(), nanoseconds(0));
31+
return duration_cast<milliseconds>(dur).count();
32+
}
33+
2434
MainLoopWindows::MainLoopWindows() {
25-
m_trigger_event = WSACreateEvent();
26-
assert(m_trigger_event != WSA_INVALID_EVENT);
35+
m_interrupt_event = WSACreateEvent();
36+
assert(m_interrupt_event != WSA_INVALID_EVENT);
2737
}
2838

2939
MainLoopWindows::~MainLoopWindows() {
3040
assert(m_read_fds.empty());
31-
BOOL result = WSACloseEvent(m_trigger_event);
41+
BOOL result = WSACloseEvent(m_interrupt_event);
3242
assert(result == TRUE);
3343
UNUSED_IF_ASSERT_DISABLED(result);
3444
}
@@ -43,20 +53,25 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
4353

4454
events.push_back(info.event);
4555
}
46-
events.push_back(m_trigger_event);
56+
events.push_back(m_interrupt_event);
4757

48-
DWORD result = WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
49-
WSA_INFINITE, FALSE);
58+
DWORD result =
59+
WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
60+
ToTimeout(GetNextWakeupTime()), FALSE);
5061

5162
for (auto &fd : m_read_fds) {
5263
int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0);
5364
assert(result == 0);
5465
UNUSED_IF_ASSERT_DISABLED(result);
5566
}
5667

57-
if (result >= WSA_WAIT_EVENT_0 && result <= WSA_WAIT_EVENT_0 + events.size())
68+
if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size())
5869
return result - WSA_WAIT_EVENT_0;
5970

71+
// A timeout is treated as a (premature) signalization of the interrupt event.
72+
if (result == WSA_WAIT_TIMEOUT)
73+
return events.size() - 1;
74+
6075
return llvm::createStringError(llvm::inconvertibleErrorCode(),
6176
"WSAWaitForMultipleEvents failed");
6277
}
@@ -127,13 +142,11 @@ Status MainLoopWindows::Run() {
127142
ProcessReadObject(KV.first);
128143
} else {
129144
assert(*signaled_event == m_read_fds.size());
130-
WSAResetEvent(m_trigger_event);
145+
WSAResetEvent(m_interrupt_event);
131146
}
132-
ProcessPendingCallbacks();
147+
ProcessCallbacks();
133148
}
134149
return Status();
135150
}
136151

137-
void MainLoopWindows::TriggerPendingCallbacks() {
138-
WSASetEvent(m_trigger_event);
139-
}
152+
void MainLoopWindows::Interrupt() { WSASetEvent(m_interrupt_event); }

0 commit comments

Comments
 (0)