Skip to content

[lldb] Add timed callbacks to the MainLoop class #112895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions lldb/include/lldb/Host/MainLoopBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
#include "lldb/Utility/Status.h"
#include "llvm/ADT/DenseMap.h"
#include "llvm/Support/ErrorHandling.h"
#include <chrono>
#include <functional>
#include <mutex>
#include <queue>

namespace lldb_private {

Expand All @@ -38,6 +40,9 @@ class MainLoopBase {
class ReadHandle;

public:
using TimePoint = std::chrono::time_point<std::chrono::steady_clock,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are Nanoseconds here overly precise? Will we imply that callbacks will have nano-second level precision in timing out? I think because this is used for timeouts milliseconds is sufficient

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not likely to be useful, but we do have system APIs which provide nanosecond level resolution, so why not make it available. Users don't have to specify timeouts with a nanosecond level. They can just say std::chrono::seconds/*or whatever*/(X), and it will be automatically to nanoseconds.

Using a lower precision would actually be somewhat unergonomic chrono conversions which lose precision are not implicit. So something like steady_clock::now()+seconds(2) would not be implicitly convertible to milliseconds if the clock resolution was more than milliseconds (which it usually is).

std::chrono::nanoseconds>;

MainLoopBase() : m_terminate_request(false) {}
virtual ~MainLoopBase() = default;

Expand All @@ -52,7 +57,18 @@ class MainLoopBase {
// Add a pending callback that will be executed once after all the pending
// events are processed. The callback will be executed even if termination
// was requested.
void AddPendingCallback(const Callback &callback);
void AddPendingCallback(const Callback &callback) {
AddCallback(callback, std::chrono::steady_clock::time_point());
}

// Add a callback that will be executed after a certain amount of time has
// passed.
void AddCallback(const Callback &callback, std::chrono::nanoseconds delay) {
AddCallback(callback, std::chrono::steady_clock::now() + delay);
}

// Add a callback that will be executed after a given point in time.
void AddCallback(const Callback &callback, TimePoint point);

// Waits for registered events and invoke the proper callbacks. Returns when
// all callbacks deregister themselves or when someone requests termination.
Expand All @@ -69,14 +85,18 @@ class MainLoopBase {

virtual void UnregisterReadObject(IOObject::WaitableHandle handle) = 0;

// Interrupt the loop that is currently waiting for events and execute
// the current pending callbacks immediately.
virtual void TriggerPendingCallbacks() = 0;
// Interrupt the loop that is currently waiting for events.
virtual void Interrupt() = 0;

void ProcessCallbacks();

void ProcessPendingCallbacks();
std::optional<TimePoint> GetNextWakeupTime();

std::mutex m_callback_mutex;
std::vector<Callback> m_pending_callbacks;
std::priority_queue<std::pair<TimePoint, Callback>,
std::vector<std::pair<TimePoint, Callback>>,
llvm::on_first<std::greater<TimePoint>>>
m_callbacks;
bool m_terminate_request : 1;

private:
Expand Down
6 changes: 3 additions & 3 deletions lldb/include/lldb/Host/posix/MainLoopPosix.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MainLoopPosix : public MainLoopBase {
void UnregisterReadObject(IOObject::WaitableHandle handle) override;
void UnregisterSignal(int signo, std::list<Callback>::iterator callback_it);

void TriggerPendingCallbacks() override;
void Interrupt() override;

private:
void ProcessReadObject(IOObject::WaitableHandle handle);
Expand Down Expand Up @@ -88,8 +88,8 @@ class MainLoopPosix : public MainLoopBase {

llvm::DenseMap<IOObject::WaitableHandle, Callback> m_read_fds;
llvm::DenseMap<int, SignalInfo> m_signals;
Pipe m_trigger_pipe;
std::atomic<bool> m_triggering;
Pipe m_interrupt_pipe;
std::atomic<bool> m_interrupting = false;
#if HAVE_SYS_EVENT_H
int m_kqueue;
#endif
Expand Down
4 changes: 2 additions & 2 deletions lldb/include/lldb/Host/windows/MainLoopWindows.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MainLoopWindows : public MainLoopBase {
protected:
void UnregisterReadObject(IOObject::WaitableHandle handle) override;

void TriggerPendingCallbacks() override;
void Interrupt() override;

private:
void ProcessReadObject(IOObject::WaitableHandle handle);
Expand All @@ -45,7 +45,7 @@ class MainLoopWindows : public MainLoopBase {
Callback callback;
};
llvm::DenseMap<IOObject::WaitableHandle, FdInfo> m_read_fds;
void *m_trigger_event;
void *m_interrupt_event;
};

} // namespace lldb_private
Expand Down
40 changes: 28 additions & 12 deletions lldb/source/Host/common/MainLoopBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,43 @@
//===----------------------------------------------------------------------===//

#include "lldb/Host/MainLoopBase.h"
#include <chrono>

using namespace lldb;
using namespace lldb_private;

void MainLoopBase::AddPendingCallback(const Callback &callback) {
void MainLoopBase::AddCallback(const Callback &callback, TimePoint point) {
bool interrupt_needed;
{
std::lock_guard<std::mutex> lock{m_callback_mutex};
m_pending_callbacks.push_back(callback);
// We need to interrupt the main thread if this callback is scheduled to
// execute at an earlier time than the earliest callback registered so far.
interrupt_needed = m_callbacks.empty() || point < m_callbacks.top().first;
m_callbacks.emplace(point, callback);
}
TriggerPendingCallbacks();
if (interrupt_needed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the logic here, we're checking before emplacement if an interrupt is needed and then once we emplace we Interrupt,

  • Why are we setting interrupt needed when callbacks is empty?
  • Should we invert this where we trigger the interrupts before a potentially long emplace (this is in response to my nanoseconds fidelity question)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the logic here, we're checking before emplacement if an interrupt is needed and then once we emplace we Interrupt,

That's correct.

Why are we setting interrupt needed when callbacks is empty?

Empty callbacks mean an infinite timeout (line MainLoopPosix.cpp, line 52). In that case, we definitely need to interrupt the polling thread (so it can go to sleep with a new timeout). In other cases, we only need to interrupt if this operation is scheduled to run before the previous earliest operation.

Should we invert this where we trigger the interrupts before a potentially long emplace (this is in response to my nanoseconds fidelity question)

I think you're misunderstanding something here. The emplace is always fast - it just adds something to the queue and doesn't cause any callbacks to run. The callbacks will always be run on the mainloop thread. The interrupt needs to happen after the insertion to make sure the other thread observes the new callback (via the result of GetNextWakeupTime).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're misunderstanding something here. The emplace is always fast - it just adds > something to the queue and doesn't cause any callbacks to run. The callbacks will always > be run on the mainloop thread. The interrupt needs to happen after the insertion to make > sure the other thread observes the new callback (via the result of GetNextWakeupTime).

My concern was purely if we want nanosecond precision, adding into a priority queue (assuming N log N) seemed like it could be slow.

On a second look, I think my concern is moot

Interrupt();
}

void MainLoopBase::ProcessPendingCallbacks() {
// Move the callbacks to a local vector to avoid keeping m_pending_callbacks
// locked throughout the calls.
std::vector<Callback> pending_callbacks;
{
std::lock_guard<std::mutex> lock{m_callback_mutex};
pending_callbacks = std::move(m_pending_callbacks);
}
void MainLoopBase::ProcessCallbacks() {
while (true) {
Callback callback;
{
std::lock_guard<std::mutex> lock{m_callback_mutex};
if (m_callbacks.empty() ||
std::chrono::steady_clock::now() < m_callbacks.top().first)
return;
callback = std::move(m_callbacks.top().second);
m_callbacks.pop();
}

for (const Callback &callback : pending_callbacks)
callback(*this);
}
}

std::optional<MainLoopBase::TimePoint> MainLoopBase::GetNextWakeupTime() {
std::lock_guard<std::mutex> lock(m_callback_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a recursive mutex? Is there any situation where invoking a callback will end us back at this codepath? I know it's unlikely but my question is if it's impossible

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the callbacks run with the mutex unlocked. But even if they were running with mutex held, making this a recursive mutex would not help, as the code would likely not work correctly because the outer function would get surprised by the callback collection mutation under it -- even though it has it "locked". We've had several bugs like that over the years, last one being #96750.
I'm generally a big anti-fan of recursive mutexes for this reason.

if (m_callbacks.empty())
return std::nullopt;
return m_callbacks.top().first;
}
79 changes: 52 additions & 27 deletions lldb/source/Host/posix/MainLoopPosix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <algorithm>
#include <cassert>
#include <cerrno>
#include <chrono>
#include <csignal>
#include <ctime>
#include <fcntl.h>
Expand Down Expand Up @@ -67,6 +68,30 @@ static void SignalHandler(int signo, siginfo_t *info, void *) {
assert(bytes_written == 1 || (bytes_written == -1 && errno == EAGAIN));
}

class ToTimeSpec {
public:
explicit ToTimeSpec(std::optional<MainLoopPosix::TimePoint> point) {
using namespace std::chrono;

if (!point) {
m_ts_ptr = nullptr;
return;
}
nanoseconds dur = std::max(*point - steady_clock::now(), nanoseconds(0));
m_ts_ptr = &m_ts;
m_ts.tv_sec = duration_cast<seconds>(dur).count();
m_ts.tv_nsec = (dur % seconds(1)).count();
}
ToTimeSpec(const ToTimeSpec &) = delete;
ToTimeSpec &operator=(const ToTimeSpec &) = delete;

operator struct timespec *() { return m_ts_ptr; }

private:
struct timespec m_ts;
struct timespec *m_ts_ptr;
};

class MainLoopPosix::RunImpl {
public:
RunImpl(MainLoopPosix &loop);
Expand Down Expand Up @@ -99,8 +124,9 @@ Status MainLoopPosix::RunImpl::Poll() {
for (auto &fd : loop.m_read_fds)
EV_SET(&in_events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0);

num_events = kevent(loop.m_kqueue, in_events.data(), in_events.size(),
out_events, std::size(out_events), nullptr);
num_events =
kevent(loop.m_kqueue, in_events.data(), in_events.size(), out_events,
std::size(out_events), ToTimeSpec(loop.GetNextWakeupTime()));

if (num_events < 0) {
if (errno == EINTR) {
Expand Down Expand Up @@ -144,7 +170,7 @@ Status MainLoopPosix::RunImpl::Poll() {
}

if (ppoll(read_fds.data(), read_fds.size(),
/*timeout=*/nullptr,
ToTimeSpec(loop.GetNextWakeupTime()),
/*sigmask=*/nullptr) == -1 &&
errno != EINTR)
return Status(errno, eErrorTypePOSIX);
Expand All @@ -165,27 +191,28 @@ void MainLoopPosix::RunImpl::ProcessReadEvents() {
}
#endif

MainLoopPosix::MainLoopPosix() : m_triggering(false) {
Status error = m_trigger_pipe.CreateNew(/*child_process_inherit=*/false);
MainLoopPosix::MainLoopPosix() {
Status error = m_interrupt_pipe.CreateNew(/*child_process_inherit=*/false);
assert(error.Success());

// Make the write end of the pipe non-blocking.
int result = fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_SETFL,
fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_GETFL) |
int result = fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_SETFL,
fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_GETFL) |
O_NONBLOCK);
assert(result == 0);
UNUSED_IF_ASSERT_DISABLED(result);

const int trigger_pipe_fd = m_trigger_pipe.GetReadFileDescriptor();
m_read_fds.insert({trigger_pipe_fd, [trigger_pipe_fd](MainLoopBase &loop) {
char c;
ssize_t bytes_read = llvm::sys::RetryAfterSignal(
-1, ::read, trigger_pipe_fd, &c, 1);
assert(bytes_read == 1);
UNUSED_IF_ASSERT_DISABLED(bytes_read);
// NB: This implicitly causes another loop iteration
// and therefore the execution of pending callbacks.
}});
const int interrupt_pipe_fd = m_interrupt_pipe.GetReadFileDescriptor();
m_read_fds.insert(
{interrupt_pipe_fd, [interrupt_pipe_fd](MainLoopBase &loop) {
char c;
ssize_t bytes_read =
llvm::sys::RetryAfterSignal(-1, ::read, interrupt_pipe_fd, &c, 1);
assert(bytes_read == 1);
UNUSED_IF_ASSERT_DISABLED(bytes_read);
// NB: This implicitly causes another loop iteration
// and therefore the execution of pending callbacks.
}});
#if HAVE_SYS_EVENT_H
m_kqueue = kqueue();
assert(m_kqueue >= 0);
Expand All @@ -196,8 +223,8 @@ MainLoopPosix::~MainLoopPosix() {
#if HAVE_SYS_EVENT_H
close(m_kqueue);
#endif
m_read_fds.erase(m_trigger_pipe.GetReadFileDescriptor());
m_trigger_pipe.Close();
m_read_fds.erase(m_interrupt_pipe.GetReadFileDescriptor());
m_interrupt_pipe.Close();
assert(m_read_fds.size() == 0);
assert(m_signals.size() == 0);
}
Expand Down Expand Up @@ -244,11 +271,9 @@ MainLoopPosix::RegisterSignal(int signo, const Callback &callback,
sigset_t old_set;

// Set signal info before installing the signal handler!
g_signal_info[signo].pipe_fd = m_trigger_pipe.GetWriteFileDescriptor();
g_signal_info[signo].pipe_fd = m_interrupt_pipe.GetWriteFileDescriptor();
g_signal_info[signo].flag = 0;

// Even if using kqueue, the signal handler will still be invoked, so it's
// important to replace it with our "benign" handler.
int ret = sigaction(signo, &new_action, &info.old_action);
UNUSED_IF_ASSERT_DISABLED(ret);
assert(ret == 0 && "sigaction failed");
Expand Down Expand Up @@ -307,8 +332,8 @@ Status MainLoopPosix::Run() {

ProcessSignals();

m_triggering = false;
ProcessPendingCallbacks();
m_interrupting = false;
ProcessCallbacks();
}
return Status();
}
Expand Down Expand Up @@ -346,13 +371,13 @@ void MainLoopPosix::ProcessSignal(int signo) {
}
}

void MainLoopPosix::TriggerPendingCallbacks() {
if (m_triggering.exchange(true))
void MainLoopPosix::Interrupt() {
if (m_interrupting.exchange(true))
return;

char c = '.';
size_t bytes_written;
Status error = m_trigger_pipe.Write(&c, 1, bytes_written);
Status error = m_interrupt_pipe.Write(&c, 1, bytes_written);
assert(error.Success());
UNUSED_IF_ASSERT_DISABLED(error);
assert(bytes_written == 1);
Expand Down
37 changes: 25 additions & 12 deletions lldb/source/Host/windows/MainLoopWindows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,24 @@
using namespace lldb;
using namespace lldb_private;

static DWORD ToTimeout(std::optional<MainLoopWindows::TimePoint> point) {
using namespace std::chrono;

if (!point)
return WSA_INFINITE;

nanoseconds dur = (std::max)(*point - steady_clock::now(), nanoseconds(0));
return duration_cast<milliseconds>(dur).count();
}

MainLoopWindows::MainLoopWindows() {
m_trigger_event = WSACreateEvent();
assert(m_trigger_event != WSA_INVALID_EVENT);
m_interrupt_event = WSACreateEvent();
assert(m_interrupt_event != WSA_INVALID_EVENT);
}

MainLoopWindows::~MainLoopWindows() {
assert(m_read_fds.empty());
BOOL result = WSACloseEvent(m_trigger_event);
BOOL result = WSACloseEvent(m_interrupt_event);
assert(result == TRUE);
UNUSED_IF_ASSERT_DISABLED(result);
}
Expand All @@ -43,20 +53,25 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {

events.push_back(info.event);
}
events.push_back(m_trigger_event);
events.push_back(m_interrupt_event);

DWORD result = WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
WSA_INFINITE, FALSE);
DWORD result =
WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
ToTimeout(GetNextWakeupTime()), FALSE);

for (auto &fd : m_read_fds) {
int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0);
assert(result == 0);
UNUSED_IF_ASSERT_DISABLED(result);
}

if (result >= WSA_WAIT_EVENT_0 && result <= WSA_WAIT_EVENT_0 + events.size())
if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size())
return result - WSA_WAIT_EVENT_0;

// A timeout is treated as a (premature) signalization of the interrupt event.
if (result == WSA_WAIT_TIMEOUT)
return events.size() - 1;

return llvm::createStringError(llvm::inconvertibleErrorCode(),
"WSAWaitForMultipleEvents failed");
}
Expand Down Expand Up @@ -127,13 +142,11 @@ Status MainLoopWindows::Run() {
ProcessReadObject(KV.first);
} else {
assert(*signaled_event == m_read_fds.size());
WSAResetEvent(m_trigger_event);
WSAResetEvent(m_interrupt_event);
}
ProcessPendingCallbacks();
ProcessCallbacks();
}
return Status();
}

void MainLoopWindows::TriggerPendingCallbacks() {
WSASetEvent(m_trigger_event);
}
void MainLoopWindows::Interrupt() { WSASetEvent(m_interrupt_event); }
Loading
Loading