Skip to content

Commit 55068dc

Browse files
authored
[lldb] Add timed callbacks to the MainLoop class (#112895)
The motivating use case is being able to "time out" certain operations (by adding a timed callback which will force the termination of the loop), but the design is flexible enough to accomodate other use cases as well (e.g. running a periodic task in the background). The implementation builds on the existing "pending callback" mechanism, by associating a time point with each callback -- every time the loop wakes up, it runs all of the callbacks which are past their point, and it also makes sure to sleep only until the next callback is scheduled to run. I've done some renaming as names like "TriggerPendingCallbacks" were no longer accurate -- the function may no longer cause any callbacks to be called (it may just cause the main loop thread to recalculate the time it wants to sleep).
1 parent 456e609 commit 55068dc

File tree

7 files changed

+194
-68
lines changed

7 files changed

+194
-68
lines changed

lldb/include/lldb/Host/MainLoopBase.h

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
#include "lldb/Utility/Status.h"
1414
#include "llvm/ADT/DenseMap.h"
1515
#include "llvm/Support/ErrorHandling.h"
16+
#include <chrono>
1617
#include <functional>
1718
#include <mutex>
19+
#include <queue>
1820

1921
namespace lldb_private {
2022

@@ -38,6 +40,9 @@ class MainLoopBase {
3840
class ReadHandle;
3941

4042
public:
43+
using TimePoint = std::chrono::time_point<std::chrono::steady_clock,
44+
std::chrono::nanoseconds>;
45+
4146
MainLoopBase() : m_terminate_request(false) {}
4247
virtual ~MainLoopBase() = default;
4348

@@ -52,7 +57,18 @@ class MainLoopBase {
5257
// Add a pending callback that will be executed once after all the pending
5358
// events are processed. The callback will be executed even if termination
5459
// was requested.
55-
void AddPendingCallback(const Callback &callback);
60+
void AddPendingCallback(const Callback &callback) {
61+
AddCallback(callback, std::chrono::steady_clock::time_point());
62+
}
63+
64+
// Add a callback that will be executed after a certain amount of time has
65+
// passed.
66+
void AddCallback(const Callback &callback, std::chrono::nanoseconds delay) {
67+
AddCallback(callback, std::chrono::steady_clock::now() + delay);
68+
}
69+
70+
// Add a callback that will be executed after a given point in time.
71+
void AddCallback(const Callback &callback, TimePoint point);
5672

5773
// Waits for registered events and invoke the proper callbacks. Returns when
5874
// all callbacks deregister themselves or when someone requests termination.
@@ -69,14 +85,18 @@ class MainLoopBase {
6985

7086
virtual void UnregisterReadObject(IOObject::WaitableHandle handle) = 0;
7187

72-
// Interrupt the loop that is currently waiting for events and execute
73-
// the current pending callbacks immediately.
74-
virtual void TriggerPendingCallbacks() = 0;
88+
// Interrupt the loop that is currently waiting for events.
89+
virtual void Interrupt() = 0;
90+
91+
void ProcessCallbacks();
7592

76-
void ProcessPendingCallbacks();
93+
std::optional<TimePoint> GetNextWakeupTime();
7794

7895
std::mutex m_callback_mutex;
79-
std::vector<Callback> m_pending_callbacks;
96+
std::priority_queue<std::pair<TimePoint, Callback>,
97+
std::vector<std::pair<TimePoint, Callback>>,
98+
llvm::on_first<std::greater<TimePoint>>>
99+
m_callbacks;
80100
bool m_terminate_request : 1;
81101

82102
private:

lldb/include/lldb/Host/posix/MainLoopPosix.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class MainLoopPosix : public MainLoopBase {
5454
void UnregisterReadObject(IOObject::WaitableHandle handle) override;
5555
void UnregisterSignal(int signo, std::list<Callback>::iterator callback_it);
5656

57-
void TriggerPendingCallbacks() override;
57+
void Interrupt() override;
5858

5959
private:
6060
void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -88,8 +88,8 @@ class MainLoopPosix : public MainLoopBase {
8888

8989
llvm::DenseMap<IOObject::WaitableHandle, Callback> m_read_fds;
9090
llvm::DenseMap<int, SignalInfo> m_signals;
91-
Pipe m_trigger_pipe;
92-
std::atomic<bool> m_triggering;
91+
Pipe m_interrupt_pipe;
92+
std::atomic<bool> m_interrupting = false;
9393
#if HAVE_SYS_EVENT_H
9494
int m_kqueue;
9595
#endif

lldb/include/lldb/Host/windows/MainLoopWindows.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class MainLoopWindows : public MainLoopBase {
3434
protected:
3535
void UnregisterReadObject(IOObject::WaitableHandle handle) override;
3636

37-
void TriggerPendingCallbacks() override;
37+
void Interrupt() override;
3838

3939
private:
4040
void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -45,7 +45,7 @@ class MainLoopWindows : public MainLoopBase {
4545
Callback callback;
4646
};
4747
llvm::DenseMap<IOObject::WaitableHandle, FdInfo> m_read_fds;
48-
void *m_trigger_event;
48+
void *m_interrupt_event;
4949
};
5050

5151
} // namespace lldb_private

lldb/source/Host/common/MainLoopBase.cpp

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,43 @@
77
//===----------------------------------------------------------------------===//
88

99
#include "lldb/Host/MainLoopBase.h"
10+
#include <chrono>
1011

1112
using namespace lldb;
1213
using namespace lldb_private;
1314

14-
void MainLoopBase::AddPendingCallback(const Callback &callback) {
15+
void MainLoopBase::AddCallback(const Callback &callback, TimePoint point) {
16+
bool interrupt_needed;
1517
{
1618
std::lock_guard<std::mutex> lock{m_callback_mutex};
17-
m_pending_callbacks.push_back(callback);
19+
// We need to interrupt the main thread if this callback is scheduled to
20+
// execute at an earlier time than the earliest callback registered so far.
21+
interrupt_needed = m_callbacks.empty() || point < m_callbacks.top().first;
22+
m_callbacks.emplace(point, callback);
1823
}
19-
TriggerPendingCallbacks();
24+
if (interrupt_needed)
25+
Interrupt();
2026
}
2127

22-
void MainLoopBase::ProcessPendingCallbacks() {
23-
// Move the callbacks to a local vector to avoid keeping m_pending_callbacks
24-
// locked throughout the calls.
25-
std::vector<Callback> pending_callbacks;
26-
{
27-
std::lock_guard<std::mutex> lock{m_callback_mutex};
28-
pending_callbacks = std::move(m_pending_callbacks);
29-
}
28+
void MainLoopBase::ProcessCallbacks() {
29+
while (true) {
30+
Callback callback;
31+
{
32+
std::lock_guard<std::mutex> lock{m_callback_mutex};
33+
if (m_callbacks.empty() ||
34+
std::chrono::steady_clock::now() < m_callbacks.top().first)
35+
return;
36+
callback = std::move(m_callbacks.top().second);
37+
m_callbacks.pop();
38+
}
3039

31-
for (const Callback &callback : pending_callbacks)
3240
callback(*this);
41+
}
42+
}
43+
44+
std::optional<MainLoopBase::TimePoint> MainLoopBase::GetNextWakeupTime() {
45+
std::lock_guard<std::mutex> lock(m_callback_mutex);
46+
if (m_callbacks.empty())
47+
return std::nullopt;
48+
return m_callbacks.top().first;
3349
}

lldb/source/Host/posix/MainLoopPosix.cpp

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <algorithm>
1616
#include <cassert>
1717
#include <cerrno>
18+
#include <chrono>
1819
#include <csignal>
1920
#include <ctime>
2021
#include <fcntl.h>
@@ -68,6 +69,30 @@ static void SignalHandler(int signo, siginfo_t *info, void *) {
6869
(void)bytes_written;
6970
}
7071

72+
class ToTimeSpec {
73+
public:
74+
explicit ToTimeSpec(std::optional<MainLoopPosix::TimePoint> point) {
75+
using namespace std::chrono;
76+
77+
if (!point) {
78+
m_ts_ptr = nullptr;
79+
return;
80+
}
81+
nanoseconds dur = std::max(*point - steady_clock::now(), nanoseconds(0));
82+
m_ts_ptr = &m_ts;
83+
m_ts.tv_sec = duration_cast<seconds>(dur).count();
84+
m_ts.tv_nsec = (dur % seconds(1)).count();
85+
}
86+
ToTimeSpec(const ToTimeSpec &) = delete;
87+
ToTimeSpec &operator=(const ToTimeSpec &) = delete;
88+
89+
operator struct timespec *() { return m_ts_ptr; }
90+
91+
private:
92+
struct timespec m_ts;
93+
struct timespec *m_ts_ptr;
94+
};
95+
7196
class MainLoopPosix::RunImpl {
7297
public:
7398
RunImpl(MainLoopPosix &loop);
@@ -100,8 +125,9 @@ Status MainLoopPosix::RunImpl::Poll() {
100125
for (auto &fd : loop.m_read_fds)
101126
EV_SET(&in_events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0);
102127

103-
num_events = kevent(loop.m_kqueue, in_events.data(), in_events.size(),
104-
out_events, std::size(out_events), nullptr);
128+
num_events =
129+
kevent(loop.m_kqueue, in_events.data(), in_events.size(), out_events,
130+
std::size(out_events), ToTimeSpec(loop.GetNextWakeupTime()));
105131

106132
if (num_events < 0) {
107133
if (errno == EINTR) {
@@ -145,7 +171,7 @@ Status MainLoopPosix::RunImpl::Poll() {
145171
}
146172

147173
if (ppoll(read_fds.data(), read_fds.size(),
148-
/*timeout=*/nullptr,
174+
ToTimeSpec(loop.GetNextWakeupTime()),
149175
/*sigmask=*/nullptr) == -1 &&
150176
errno != EINTR)
151177
return Status(errno, eErrorTypePOSIX);
@@ -166,27 +192,28 @@ void MainLoopPosix::RunImpl::ProcessReadEvents() {
166192
}
167193
#endif
168194

169-
MainLoopPosix::MainLoopPosix() : m_triggering(false) {
170-
Status error = m_trigger_pipe.CreateNew(/*child_process_inherit=*/false);
195+
MainLoopPosix::MainLoopPosix() {
196+
Status error = m_interrupt_pipe.CreateNew(/*child_process_inherit=*/false);
171197
assert(error.Success());
172198

173199
// Make the write end of the pipe non-blocking.
174-
int result = fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_SETFL,
175-
fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_GETFL) |
200+
int result = fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_SETFL,
201+
fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_GETFL) |
176202
O_NONBLOCK);
177203
assert(result == 0);
178204
UNUSED_IF_ASSERT_DISABLED(result);
179205

180-
const int trigger_pipe_fd = m_trigger_pipe.GetReadFileDescriptor();
181-
m_read_fds.insert({trigger_pipe_fd, [trigger_pipe_fd](MainLoopBase &loop) {
182-
char c;
183-
ssize_t bytes_read = llvm::sys::RetryAfterSignal(
184-
-1, ::read, trigger_pipe_fd, &c, 1);
185-
assert(bytes_read == 1);
186-
UNUSED_IF_ASSERT_DISABLED(bytes_read);
187-
// NB: This implicitly causes another loop iteration
188-
// and therefore the execution of pending callbacks.
189-
}});
206+
const int interrupt_pipe_fd = m_interrupt_pipe.GetReadFileDescriptor();
207+
m_read_fds.insert(
208+
{interrupt_pipe_fd, [interrupt_pipe_fd](MainLoopBase &loop) {
209+
char c;
210+
ssize_t bytes_read =
211+
llvm::sys::RetryAfterSignal(-1, ::read, interrupt_pipe_fd, &c, 1);
212+
assert(bytes_read == 1);
213+
UNUSED_IF_ASSERT_DISABLED(bytes_read);
214+
// NB: This implicitly causes another loop iteration
215+
// and therefore the execution of pending callbacks.
216+
}});
190217
#if HAVE_SYS_EVENT_H
191218
m_kqueue = kqueue();
192219
assert(m_kqueue >= 0);
@@ -197,8 +224,8 @@ MainLoopPosix::~MainLoopPosix() {
197224
#if HAVE_SYS_EVENT_H
198225
close(m_kqueue);
199226
#endif
200-
m_read_fds.erase(m_trigger_pipe.GetReadFileDescriptor());
201-
m_trigger_pipe.Close();
227+
m_read_fds.erase(m_interrupt_pipe.GetReadFileDescriptor());
228+
m_interrupt_pipe.Close();
202229
assert(m_read_fds.size() == 0);
203230
assert(m_signals.size() == 0);
204231
}
@@ -245,11 +272,9 @@ MainLoopPosix::RegisterSignal(int signo, const Callback &callback,
245272
sigset_t old_set;
246273

247274
// Set signal info before installing the signal handler!
248-
g_signal_info[signo].pipe_fd = m_trigger_pipe.GetWriteFileDescriptor();
275+
g_signal_info[signo].pipe_fd = m_interrupt_pipe.GetWriteFileDescriptor();
249276
g_signal_info[signo].flag = 0;
250277

251-
// Even if using kqueue, the signal handler will still be invoked, so it's
252-
// important to replace it with our "benign" handler.
253278
int ret = sigaction(signo, &new_action, &info.old_action);
254279
UNUSED_IF_ASSERT_DISABLED(ret);
255280
assert(ret == 0 && "sigaction failed");
@@ -308,8 +333,8 @@ Status MainLoopPosix::Run() {
308333

309334
ProcessSignals();
310335

311-
m_triggering = false;
312-
ProcessPendingCallbacks();
336+
m_interrupting = false;
337+
ProcessCallbacks();
313338
}
314339
return Status();
315340
}
@@ -347,13 +372,13 @@ void MainLoopPosix::ProcessSignal(int signo) {
347372
}
348373
}
349374

350-
void MainLoopPosix::TriggerPendingCallbacks() {
351-
if (m_triggering.exchange(true))
375+
void MainLoopPosix::Interrupt() {
376+
if (m_interrupting.exchange(true))
352377
return;
353378

354379
char c = '.';
355380
size_t bytes_written;
356-
Status error = m_trigger_pipe.Write(&c, 1, bytes_written);
381+
Status error = m_interrupt_pipe.Write(&c, 1, bytes_written);
357382
assert(error.Success());
358383
UNUSED_IF_ASSERT_DISABLED(error);
359384
assert(bytes_written == 1);

lldb/source/Host/windows/MainLoopWindows.cpp

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,24 @@
2121
using namespace lldb;
2222
using namespace lldb_private;
2323

24+
static DWORD ToTimeout(std::optional<MainLoopWindows::TimePoint> point) {
25+
using namespace std::chrono;
26+
27+
if (!point)
28+
return WSA_INFINITE;
29+
30+
nanoseconds dur = (std::max)(*point - steady_clock::now(), nanoseconds(0));
31+
return duration_cast<milliseconds>(dur).count();
32+
}
33+
2434
MainLoopWindows::MainLoopWindows() {
25-
m_trigger_event = WSACreateEvent();
26-
assert(m_trigger_event != WSA_INVALID_EVENT);
35+
m_interrupt_event = WSACreateEvent();
36+
assert(m_interrupt_event != WSA_INVALID_EVENT);
2737
}
2838

2939
MainLoopWindows::~MainLoopWindows() {
3040
assert(m_read_fds.empty());
31-
BOOL result = WSACloseEvent(m_trigger_event);
41+
BOOL result = WSACloseEvent(m_interrupt_event);
3242
assert(result == TRUE);
3343
UNUSED_IF_ASSERT_DISABLED(result);
3444
}
@@ -43,20 +53,25 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
4353

4454
events.push_back(info.event);
4555
}
46-
events.push_back(m_trigger_event);
56+
events.push_back(m_interrupt_event);
4757

48-
DWORD result = WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
49-
WSA_INFINITE, FALSE);
58+
DWORD result =
59+
WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
60+
ToTimeout(GetNextWakeupTime()), FALSE);
5061

5162
for (auto &fd : m_read_fds) {
5263
int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0);
5364
assert(result == 0);
5465
UNUSED_IF_ASSERT_DISABLED(result);
5566
}
5667

57-
if (result >= WSA_WAIT_EVENT_0 && result <= WSA_WAIT_EVENT_0 + events.size())
68+
if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size())
5869
return result - WSA_WAIT_EVENT_0;
5970

71+
// A timeout is treated as a (premature) signalization of the interrupt event.
72+
if (result == WSA_WAIT_TIMEOUT)
73+
return events.size() - 1;
74+
6075
return llvm::createStringError(llvm::inconvertibleErrorCode(),
6176
"WSAWaitForMultipleEvents failed");
6277
}
@@ -127,13 +142,11 @@ Status MainLoopWindows::Run() {
127142
ProcessReadObject(KV.first);
128143
} else {
129144
assert(*signaled_event == m_read_fds.size());
130-
WSAResetEvent(m_trigger_event);
145+
WSAResetEvent(m_interrupt_event);
131146
}
132-
ProcessPendingCallbacks();
147+
ProcessCallbacks();
133148
}
134149
return Status();
135150
}
136151

137-
void MainLoopWindows::TriggerPendingCallbacks() {
138-
WSASetEvent(m_trigger_event);
139-
}
152+
void MainLoopWindows::Interrupt() { WSASetEvent(m_interrupt_event); }

0 commit comments

Comments
 (0)