-
Notifications
You must be signed in to change notification settings - Fork 14.3k
[lldb] Add timed callbacks to the MainLoop class #112895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@llvm/pr-subscribers-lldb Author: Pavel Labath (labath) ChangesThe motivating use case is being able to "time out" certain operations (by adding a timed callback which will force the termination of the loop), but the design is flexible enough to accomodate other use cases as well (e.g. running a periodic task in the background). The implementation builds on the existing "pending callback" mechanism, by associating a time point with each callback -- every time the loop wakes up, it runs all of the callbacks which are past their point, and it also makes sure to sleep only until the next callback is scheduled to run. I've done some renaming as names like "TriggerPendingCallbacks" were no longer accurate -- the function may no longer cause any callbacks to be called (it may just cause the main loop thread to recalculate the time it wants to sleep). Full diff: https://github.com/llvm/llvm-project/pull/112895.diff 7 Files Affected:
diff --git a/lldb/include/lldb/Host/MainLoopBase.h b/lldb/include/lldb/Host/MainLoopBase.h
index 7365ee7a65ee64..0b79c52a358f4b 100644
--- a/lldb/include/lldb/Host/MainLoopBase.h
+++ b/lldb/include/lldb/Host/MainLoopBase.h
@@ -13,8 +13,10 @@
#include "lldb/Utility/Status.h"
#include "llvm/ADT/DenseMap.h"
#include "llvm/Support/ErrorHandling.h"
+#include <chrono>
#include <functional>
#include <mutex>
+#include <queue>
namespace lldb_private {
@@ -38,6 +40,9 @@ class MainLoopBase {
class ReadHandle;
public:
+ using TimePoint = std::chrono::time_point<std::chrono::steady_clock,
+ std::chrono::nanoseconds>;
+
MainLoopBase() : m_terminate_request(false) {}
virtual ~MainLoopBase() = default;
@@ -52,7 +57,19 @@ class MainLoopBase {
// Add a pending callback that will be executed once after all the pending
// events are processed. The callback will be executed even if termination
// was requested.
- void AddPendingCallback(const Callback &callback);
+ void AddPendingCallback(const Callback &callback) {
+ AddCallback(callback, std::chrono::steady_clock::time_point());
+ }
+
+ // Add a callback that will be executed after a certain amount of time has
+ // passed.
+ void AddCallback(const Callback &callback,
+ std::chrono::nanoseconds delay) {
+ AddCallback(callback, std::chrono::steady_clock::now() + delay);
+ }
+
+ // Add a callback that will be executed after a given point in time.
+ void AddCallback(const Callback &callback, TimePoint point);
// Waits for registered events and invoke the proper callbacks. Returns when
// all callbacks deregister themselves or when someone requests termination.
@@ -69,14 +86,18 @@ class MainLoopBase {
virtual void UnregisterReadObject(IOObject::WaitableHandle handle) = 0;
- // Interrupt the loop that is currently waiting for events and execute
- // the current pending callbacks immediately.
- virtual void TriggerPendingCallbacks() = 0;
+ // Interrupt the loop that is currently waiting for events.
+ virtual void Interrupt() = 0;
+
+ void ProcessCallbacks();
- void ProcessPendingCallbacks();
+ std::optional<TimePoint> GetNextWakeupTime();
std::mutex m_callback_mutex;
- std::vector<Callback> m_pending_callbacks;
+ std::priority_queue<std::pair<TimePoint, Callback>,
+ std::vector<std::pair<TimePoint, Callback>>,
+ llvm::on_first<std::greater<TimePoint>>>
+ m_callbacks;
bool m_terminate_request : 1;
private:
diff --git a/lldb/include/lldb/Host/posix/MainLoopPosix.h b/lldb/include/lldb/Host/posix/MainLoopPosix.h
index 07497b7b8c259a..bff96583bf5424 100644
--- a/lldb/include/lldb/Host/posix/MainLoopPosix.h
+++ b/lldb/include/lldb/Host/posix/MainLoopPosix.h
@@ -54,7 +54,7 @@ class MainLoopPosix : public MainLoopBase {
void UnregisterReadObject(IOObject::WaitableHandle handle) override;
void UnregisterSignal(int signo, std::list<Callback>::iterator callback_it);
- void TriggerPendingCallbacks() override;
+ void Interrupt() override;
private:
void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -87,8 +87,8 @@ class MainLoopPosix : public MainLoopBase {
llvm::DenseMap<IOObject::WaitableHandle, Callback> m_read_fds;
llvm::DenseMap<int, SignalInfo> m_signals;
- Pipe m_trigger_pipe;
- std::atomic<bool> m_triggering;
+ Pipe m_interrupt_pipe;
+ std::atomic<bool> m_interrupting = false;
#if HAVE_SYS_EVENT_H
int m_kqueue;
#endif
diff --git a/lldb/include/lldb/Host/windows/MainLoopWindows.h b/lldb/include/lldb/Host/windows/MainLoopWindows.h
index 33e179e6c1286c..3937a24645d955 100644
--- a/lldb/include/lldb/Host/windows/MainLoopWindows.h
+++ b/lldb/include/lldb/Host/windows/MainLoopWindows.h
@@ -34,7 +34,7 @@ class MainLoopWindows : public MainLoopBase {
protected:
void UnregisterReadObject(IOObject::WaitableHandle handle) override;
- void TriggerPendingCallbacks() override;
+ void Interrupt() override;
private:
void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -45,7 +45,7 @@ class MainLoopWindows : public MainLoopBase {
Callback callback;
};
llvm::DenseMap<IOObject::WaitableHandle, FdInfo> m_read_fds;
- void *m_trigger_event;
+ void *m_interrupt_event;
};
} // namespace lldb_private
diff --git a/lldb/source/Host/common/MainLoopBase.cpp b/lldb/source/Host/common/MainLoopBase.cpp
index 030a4f0371681e..64a57e65849e99 100644
--- a/lldb/source/Host/common/MainLoopBase.cpp
+++ b/lldb/source/Host/common/MainLoopBase.cpp
@@ -7,27 +7,43 @@
//===----------------------------------------------------------------------===//
#include "lldb/Host/MainLoopBase.h"
+#include <chrono>
using namespace lldb;
using namespace lldb_private;
-void MainLoopBase::AddPendingCallback(const Callback &callback) {
+void MainLoopBase::AddCallback(const Callback &callback, TimePoint point) {
+ bool interrupt_needed;
{
std::lock_guard<std::mutex> lock{m_callback_mutex};
- m_pending_callbacks.push_back(callback);
+ // We need to interrupt the main thread if this callback is scheduled to
+ // execute at an earlier time than the earliest callback registered so far.
+ interrupt_needed = m_callbacks.empty() || point < m_callbacks.top().first;
+ m_callbacks.emplace(point, callback);
}
- TriggerPendingCallbacks();
+ if (interrupt_needed)
+ Interrupt();
}
-void MainLoopBase::ProcessPendingCallbacks() {
- // Move the callbacks to a local vector to avoid keeping m_pending_callbacks
- // locked throughout the calls.
- std::vector<Callback> pending_callbacks;
- {
- std::lock_guard<std::mutex> lock{m_callback_mutex};
- pending_callbacks = std::move(m_pending_callbacks);
- }
+void MainLoopBase::ProcessCallbacks() {
+ while (true) {
+ Callback callback;
+ {
+ std::lock_guard<std::mutex> lock{m_callback_mutex};
+ if (m_callbacks.empty() ||
+ std::chrono::steady_clock::now() < m_callbacks.top().first)
+ return;
+ callback = std::move(m_callbacks.top().second);
+ m_callbacks.pop();
+ }
- for (const Callback &callback : pending_callbacks)
callback(*this);
+ }
+}
+
+std::optional<MainLoopBase::TimePoint> MainLoopBase::GetNextWakeupTime() {
+ std::lock_guard<std::mutex> lock(m_callback_mutex);
+ if (m_callbacks.empty())
+ return std::nullopt;
+ return m_callbacks.top().first;
}
diff --git a/lldb/source/Host/posix/MainLoopPosix.cpp b/lldb/source/Host/posix/MainLoopPosix.cpp
index 6f8eaa55cfdf09..788c6c23007c41 100644
--- a/lldb/source/Host/posix/MainLoopPosix.cpp
+++ b/lldb/source/Host/posix/MainLoopPosix.cpp
@@ -15,6 +15,7 @@
#include <algorithm>
#include <cassert>
#include <cerrno>
+#include <chrono>
#include <csignal>
#include <ctime>
#include <vector>
@@ -42,6 +43,31 @@ static void SignalHandler(int signo, siginfo_t *info, void *) {
g_signal_flags[signo] = 1;
}
+
+class ToTimeSpec {
+public:
+ explicit ToTimeSpec(std::optional<MainLoopPosix::TimePoint> point) {
+ using namespace std::chrono;
+
+ if (!point) {
+ m_ts_ptr = nullptr;
+ return;
+ }
+ nanoseconds dur = std::max(*point - steady_clock::now(), nanoseconds(0));
+ m_ts_ptr = &m_ts;
+ m_ts.tv_sec = duration_cast<seconds>(dur).count();
+ m_ts.tv_nsec = (dur % seconds(1)).count();
+ }
+ ToTimeSpec(const ToTimeSpec &) = delete;
+ ToTimeSpec &operator=(const ToTimeSpec &) = delete;
+
+ operator struct timespec *() { return m_ts_ptr; }
+
+private:
+ struct timespec m_ts;
+ struct timespec *m_ts_ptr;
+};
+
class MainLoopPosix::RunImpl {
public:
RunImpl(MainLoopPosix &loop);
@@ -80,8 +106,9 @@ Status MainLoopPosix::RunImpl::Poll() {
for (auto &fd : loop.m_read_fds)
EV_SET(&in_events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0);
- num_events = kevent(loop.m_kqueue, in_events.data(), in_events.size(),
- out_events, std::size(out_events), nullptr);
+ num_events =
+ kevent(loop.m_kqueue, in_events.data(), in_events.size(), out_events,
+ std::size(out_events), ToTimeSpec(loop.GetNextWakeupTime()));
if (num_events < 0) {
if (errno == EINTR) {
@@ -155,7 +182,9 @@ Status MainLoopPosix::RunImpl::Poll() {
void *sigset_ptr;
size_t sigset_len;
} extra_data = {&kernel_sigset, sizeof(kernel_sigset)};
- if (syscall(__NR_pselect6, nfds, &read_fd_set, nullptr, nullptr, nullptr,
+ if (syscall(__NR_pselect6, nfds, &read_fd_set, /*writefds=*/nullptr,
+ /*exceptfds=*/nullptr,
+ ToTimeSpec(loop.GetNextWakeupTime()).operator struct timespec *(),
&extra_data) == -1) {
if (errno != EINTR)
return Status(errno, eErrorTypePOSIX);
@@ -179,7 +208,8 @@ Status MainLoopPosix::RunImpl::Poll() {
read_fds.push_back(pfd);
}
- if (ppoll(read_fds.data(), read_fds.size(), nullptr, &sigmask) == -1 &&
+ if (ppoll(read_fds.data(), read_fds.size(),
+ ToTimeSpec(loop.GetNextWakeupTime()), &sigmask) == -1 &&
errno != EINTR)
return Status(errno, eErrorTypePOSIX);
@@ -225,14 +255,15 @@ void MainLoopPosix::RunImpl::ProcessEvents() {
}
#endif
-MainLoopPosix::MainLoopPosix() : m_triggering(false) {
- Status error = m_trigger_pipe.CreateNew(/*child_process_inherit=*/false);
+MainLoopPosix::MainLoopPosix() {
+ Status error = m_interrupt_pipe.CreateNew(/*child_process_inherit=*/false);
assert(error.Success());
- const int trigger_pipe_fd = m_trigger_pipe.GetReadFileDescriptor();
- m_read_fds.insert({trigger_pipe_fd, [trigger_pipe_fd](MainLoopBase &loop) {
+ const int interrupt_pipe_fd = m_interrupt_pipe.GetReadFileDescriptor();
+ m_read_fds.insert({interrupt_pipe_fd,
+ [interrupt_pipe_fd](MainLoopBase &loop) {
char c;
ssize_t bytes_read = llvm::sys::RetryAfterSignal(
- -1, ::read, trigger_pipe_fd, &c, 1);
+ -1, ::read, interrupt_pipe_fd, &c, 1);
assert(bytes_read == 1);
UNUSED_IF_ASSERT_DISABLED(bytes_read);
// NB: This implicitly causes another loop iteration
@@ -248,8 +279,8 @@ MainLoopPosix::~MainLoopPosix() {
#if HAVE_SYS_EVENT_H
close(m_kqueue);
#endif
- m_read_fds.erase(m_trigger_pipe.GetReadFileDescriptor());
- m_trigger_pipe.Close();
+ m_read_fds.erase(m_interrupt_pipe.GetReadFileDescriptor());
+ m_interrupt_pipe.Close();
assert(m_read_fds.size() == 0);
assert(m_signals.size() == 0);
}
@@ -372,8 +403,8 @@ Status MainLoopPosix::Run() {
impl.ProcessEvents();
- m_triggering = false;
- ProcessPendingCallbacks();
+ m_interrupting = false;
+ ProcessCallbacks();
}
return Status();
}
@@ -396,13 +427,13 @@ void MainLoopPosix::ProcessSignal(int signo) {
}
}
-void MainLoopPosix::TriggerPendingCallbacks() {
- if (m_triggering.exchange(true))
+void MainLoopPosix::Interrupt() {
+ if (m_interrupting.exchange(true))
return;
char c = '.';
size_t bytes_written;
- Status error = m_trigger_pipe.Write(&c, 1, bytes_written);
+ Status error = m_interrupt_pipe.Write(&c, 1, bytes_written);
assert(error.Success());
UNUSED_IF_ASSERT_DISABLED(error);
assert(bytes_written == 1);
diff --git a/lldb/source/Host/windows/MainLoopWindows.cpp b/lldb/source/Host/windows/MainLoopWindows.cpp
index c9aa6d339d8f48..2f2249b1086add 100644
--- a/lldb/source/Host/windows/MainLoopWindows.cpp
+++ b/lldb/source/Host/windows/MainLoopWindows.cpp
@@ -21,14 +21,24 @@
using namespace lldb;
using namespace lldb_private;
+static DWORD ToTimeout(std::optional<MainLoopWindows::TimePoint> point) {
+ using namespace std::chrono;
+
+ if (!point)
+ return WSA_INFINITE;
+
+ nanoseconds dur = (std::max)(*point - steady_clock::now(), nanoseconds(0));
+ return duration_cast<milliseconds>(dur).count();
+}
+
MainLoopWindows::MainLoopWindows() {
- m_trigger_event = WSACreateEvent();
- assert(m_trigger_event != WSA_INVALID_EVENT);
+ m_interrupt_event = WSACreateEvent();
+ assert(m_interrupt_event != WSA_INVALID_EVENT);
}
MainLoopWindows::~MainLoopWindows() {
assert(m_read_fds.empty());
- BOOL result = WSACloseEvent(m_trigger_event);
+ BOOL result = WSACloseEvent(m_interrupt_event);
assert(result == TRUE);
UNUSED_IF_ASSERT_DISABLED(result);
}
@@ -43,10 +53,11 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
events.push_back(info.event);
}
- events.push_back(m_trigger_event);
+ events.push_back(m_interrupt_event);
- DWORD result = WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
- WSA_INFINITE, FALSE);
+ DWORD result =
+ WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
+ ToTimeout(GetNextWakeupTime()), FALSE);
for (auto &fd : m_read_fds) {
int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0);
@@ -54,9 +65,13 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
UNUSED_IF_ASSERT_DISABLED(result);
}
- if (result >= WSA_WAIT_EVENT_0 && result <= WSA_WAIT_EVENT_0 + events.size())
+ if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size())
return result - WSA_WAIT_EVENT_0;
+ // A timeout is treated as a (premature) signalization of the interrupt event.
+ if (result == WSA_WAIT_TIMEOUT)
+ return events.size() - 1;
+
return llvm::createStringError(llvm::inconvertibleErrorCode(),
"WSAWaitForMultipleEvents failed");
}
@@ -127,13 +142,13 @@ Status MainLoopWindows::Run() {
ProcessReadObject(KV.first);
} else {
assert(*signaled_event == m_read_fds.size());
- WSAResetEvent(m_trigger_event);
+ WSAResetEvent(m_interrupt_event);
}
- ProcessPendingCallbacks();
+ ProcessCallbacks();
}
return Status();
}
-void MainLoopWindows::TriggerPendingCallbacks() {
- WSASetEvent(m_trigger_event);
+void MainLoopWindows::Interrupt() {
+ WSASetEvent(m_interrupt_event);
}
diff --git a/lldb/unittests/Host/MainLoopTest.cpp b/lldb/unittests/Host/MainLoopTest.cpp
index 4688d4fed475b6..70876f694a7acc 100644
--- a/lldb/unittests/Host/MainLoopTest.cpp
+++ b/lldb/unittests/Host/MainLoopTest.cpp
@@ -15,6 +15,7 @@
#include "llvm/Config/llvm-config.h" // for LLVM_ON_UNIX
#include "llvm/Testing/Support/Error.h"
#include "gtest/gtest.h"
+#include <chrono>
#include <future>
#include <thread>
@@ -106,13 +107,9 @@ TEST_F(MainLoopTest, NoSpuriousReads) {
error);
ASSERT_THAT_ERROR(error.ToError(), llvm::Succeeded());
// Terminate the loop after one second.
- std::thread terminate_thread([&loop] {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- loop.AddPendingCallback(
- [](MainLoopBase &loop) { loop.RequestTermination(); });
- });
+ loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+ std::chrono::seconds(1));
ASSERT_THAT_ERROR(loop.Run().ToError(), llvm::Succeeded());
- terminate_thread.join();
// Make sure the callback was called only once.
ASSERT_EQ(1u, callback_count);
@@ -223,6 +220,61 @@ TEST_F(MainLoopTest, ManyPendingCallbacks) {
ASSERT_TRUE(loop.Run().Success());
}
+TEST_F(MainLoopTest, CallbackWithTimeout) {
+ MainLoop loop;
+ loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+ std::chrono::seconds(2));
+ auto start = std::chrono::steady_clock::now();
+ ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded());
+ EXPECT_GE(std::chrono::steady_clock::now() - start, std::chrono::seconds(2));
+}
+
+TEST_F(MainLoopTest, TimedCallbacksRunInOrder) {
+ MainLoop loop;
+ auto start = std::chrono::steady_clock::now();
+ std::chrono::milliseconds epsilon(10);
+ std::vector<int> order;
+ auto add_cb = [&](int id) {
+ loop.AddCallback([&order, id](MainLoopBase &) { order.push_back(id); },
+ start + id * epsilon);
+ };
+ add_cb(3);
+ add_cb(2);
+ add_cb(4);
+ add_cb(1);
+ loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+ start + 5 * epsilon);
+ ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded());
+ EXPECT_GE(std::chrono::steady_clock::now() - start, 5 * epsilon);
+ ASSERT_THAT(order, testing::ElementsAre(1, 2, 3, 4));
+}
+
+TEST_F(MainLoopTest, TimedCallbackShortensSleep) {
+ MainLoop loop;
+ auto start = std::chrono::steady_clock::now();
+ bool long_callback_called = false;
+ loop.AddCallback(
+ [&](MainLoopBase &loop) {
+ long_callback_called = true;
+ loop.RequestTermination();
+ },
+ std::chrono::seconds(30));
+ std::future<Status> async_run =
+ std::async(std::launch::async, &MainLoop::Run, std::ref(loop));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ bool short_callback_called = false;
+ loop.AddCallback(
+ [&](MainLoopBase &loop) {
+ short_callback_called = true;
+ loop.RequestTermination();
+ },
+ std::chrono::seconds(1));
+ ASSERT_THAT_ERROR(async_run.get().takeError(), llvm::Succeeded());
+ EXPECT_LT(std::chrono::steady_clock::now() - start, std::chrono::seconds(10));
+ EXPECT_TRUE(short_callback_called);
+ EXPECT_FALSE(long_callback_called);
+}
+
#ifdef LLVM_ON_UNIX
TEST_F(MainLoopTest, DetectsEOF) {
|
✅ With the latest revision this PR passed the C/C++ code formatter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of this is over my head, but I left a few comments anyway.
@@ -155,7 +181,9 @@ Status MainLoopPosix::RunImpl::Poll() { | |||
void *sigset_ptr; | |||
size_t sigset_len; | |||
} extra_data = {&kernel_sigset, sizeof(kernel_sigset)}; | |||
if (syscall(__NR_pselect6, nfds, &read_fd_set, nullptr, nullptr, nullptr, | |||
if (syscall(__NR_pselect6, nfds, &read_fd_set, /*writefds=*/nullptr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pselect is sufficiently complex that you might want to add a small blurb what you're doing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can do something even better ~~> #115197
(I'll rebase this PR on top of that afterwards)
@@ -38,6 +40,9 @@ class MainLoopBase { | |||
class ReadHandle; | |||
|
|||
public: | |||
using TimePoint = std::chrono::time_point<std::chrono::steady_clock, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are Nanoseconds here overly precise? Will we imply that callbacks will have nano-second level precision in timing out? I think because this is used for timeouts milliseconds is sufficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not likely to be useful, but we do have system APIs which provide nanosecond level resolution, so why not make it available. Users don't have to specify timeouts with a nanosecond level. They can just say std::chrono::seconds/*or whatever*/(X)
, and it will be automatically to nanoseconds.
Using a lower precision would actually be somewhat unergonomic chrono conversions which lose precision are not implicit. So something like steady_clock::now()+seconds(2)
would not be implicitly convertible to milliseconds if the clock resolution was more than milliseconds (which it usually is).
} | ||
TriggerPendingCallbacks(); | ||
if (interrupt_needed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the logic here, we're checking before emplacement if an interrupt is needed and then once we emplace we Interrupt,
- Why are we setting interrupt needed when callbacks is empty?
- Should we invert this where we trigger the interrupts before a potentially long emplace (this is in response to my nanoseconds fidelity question)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the logic here, we're checking before emplacement if an interrupt is needed and then once we emplace we Interrupt,
That's correct.
Why are we setting interrupt needed when callbacks is empty?
Empty callbacks mean an infinite timeout (line MainLoopPosix.cpp, line 52). In that case, we definitely need to interrupt the polling thread (so it can go to sleep with a new timeout). In other cases, we only need to interrupt if this operation is scheduled to run before the previous earliest operation.
Should we invert this where we trigger the interrupts before a potentially long emplace (this is in response to my nanoseconds fidelity question)
I think you're misunderstanding something here. The emplace is always fast - it just adds something to the queue and doesn't cause any callbacks to run. The callbacks will always be run on the mainloop thread. The interrupt needs to happen after the insertion to make sure the other thread observes the new callback (via the result of GetNextWakeupTime
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're misunderstanding something here. The emplace is always fast - it just adds > something to the queue and doesn't cause any callbacks to run. The callbacks will always > be run on the mainloop thread. The interrupt needs to happen after the insertion to make > sure the other thread observes the new callback (via the result of GetNextWakeupTime).
My concern was purely if we want nanosecond precision, adding into a priority queue (assuming N log N) seemed like it could be slow.
On a second look, I think my concern is moot
} | ||
|
||
std::optional<MainLoopBase::TimePoint> MainLoopBase::GetNextWakeupTime() { | ||
std::lock_guard<std::mutex> lock(m_callback_mutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a recursive mutex? Is there any situation where invoking a callback will end us back at this codepath? I know it's unlikely but my question is if it's impossible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the callbacks run with the mutex unlocked. But even if they were running with mutex held, making this a recursive mutex would not help, as the code would likely not work correctly because the outer function would get surprised by the callback collection mutation under it -- even though it has it "locked". We've had several bugs like that over the years, last one being #96750.
I'm generally a big anti-fan of recursive mutexes for this reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback. I did not forget about this PR, I just wasn't able to find the time to get back to it before now.
@@ -38,6 +40,9 @@ class MainLoopBase { | |||
class ReadHandle; | |||
|
|||
public: | |||
using TimePoint = std::chrono::time_point<std::chrono::steady_clock, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not likely to be useful, but we do have system APIs which provide nanosecond level resolution, so why not make it available. Users don't have to specify timeouts with a nanosecond level. They can just say std::chrono::seconds/*or whatever*/(X)
, and it will be automatically to nanoseconds.
Using a lower precision would actually be somewhat unergonomic chrono conversions which lose precision are not implicit. So something like steady_clock::now()+seconds(2)
would not be implicitly convertible to milliseconds if the clock resolution was more than milliseconds (which it usually is).
} | ||
TriggerPendingCallbacks(); | ||
if (interrupt_needed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the logic here, we're checking before emplacement if an interrupt is needed and then once we emplace we Interrupt,
That's correct.
Why are we setting interrupt needed when callbacks is empty?
Empty callbacks mean an infinite timeout (line MainLoopPosix.cpp, line 52). In that case, we definitely need to interrupt the polling thread (so it can go to sleep with a new timeout). In other cases, we only need to interrupt if this operation is scheduled to run before the previous earliest operation.
Should we invert this where we trigger the interrupts before a potentially long emplace (this is in response to my nanoseconds fidelity question)
I think you're misunderstanding something here. The emplace is always fast - it just adds something to the queue and doesn't cause any callbacks to run. The callbacks will always be run on the mainloop thread. The interrupt needs to happen after the insertion to make sure the other thread observes the new callback (via the result of GetNextWakeupTime
).
} | ||
|
||
std::optional<MainLoopBase::TimePoint> MainLoopBase::GetNextWakeupTime() { | ||
std::lock_guard<std::mutex> lock(m_callback_mutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the callbacks run with the mutex unlocked. But even if they were running with mutex held, making this a recursive mutex would not help, as the code would likely not work correctly because the outer function would get surprised by the callback collection mutation under it -- even though it has it "locked". We've had several bugs like that over the years, last one being #96750.
I'm generally a big anti-fan of recursive mutexes for this reason.
@@ -155,7 +181,9 @@ Status MainLoopPosix::RunImpl::Poll() { | |||
void *sigset_ptr; | |||
size_t sigset_len; | |||
} extra_data = {&kernel_sigset, sizeof(kernel_sigset)}; | |||
if (syscall(__NR_pselect6, nfds, &read_fd_set, nullptr, nullptr, nullptr, | |||
if (syscall(__NR_pselect6, nfds, &read_fd_set, /*writefds=*/nullptr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can do something even better ~~> #115197
(I'll rebase this PR on top of that afterwards)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate the explanation to my concerns. I'll try to get to your syscall replacement PR today or tomorrow.
(Closed by mistake, I'm still waiting on #115197 though) |
The motivating use case is being able to "time out" certain operations (by adding a timed callback which will force the termination of the loop), but the design is flexible enough to accomodate other use cases as well (e.g. running a periodic task in the background). The implementation builds on the existing "pending callback" mechanism, by associating a time point with each callback -- every time the loop wakes up, it runs all of the callbacks which are past their point, and it also makes sure to sleep only until the next callback is scheduled to run. I've done some renaming as names like "TriggerPendingCallbacks" were no longer accurate -- the function may no longer cause any callbacks to be called (it may just cause the main loop thread to recalculate the time it wants to sleep).
std::chrono::seconds(2)); | ||
auto start = std::chrono::steady_clock::now(); | ||
ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded()); | ||
EXPECT_GE(std::chrono::steady_clock::now() - start, std::chrono::seconds(2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI this test is flaky on Windows on Arm:
https://lab.llvm.org/buildbot/#/builders/141/builds/4179
******************** TEST 'lldb-unit :: Host/./HostTests.exe/0/9' FAILED ********************
Script(shard):
--
GTEST_OUTPUT=json:C:\Users\tcwg\llvm-worker\lldb-aarch64-windows\build\tools\lldb\unittests\Host\.\HostTests.exe-lldb-unit-16696-0-9.json GTEST_SHUFFLE=0 GTEST_TOTAL_SHARDS=9 GTEST_SHARD_INDEX=0 C:\Users\tcwg\llvm-worker\lldb-aarch64-windows\build\tools\lldb\unittests\Host\.\HostTests.exe
--
Script:
--
C:\Users\tcwg\llvm-worker\lldb-aarch64-windows\build\tools\lldb\unittests\Host\.\HostTests.exe --gtest_filter=MainLoopTest.CallbackWithTimeout
--
C:\Users\tcwg\llvm-worker\lldb-aarch64-windows\llvm-project\lldb\unittests\Host\MainLoopTest.cpp(229): error: Expected: (std::chrono::steady_clock::now() - start) >= (std::chrono::seconds(2)), actual: 8-byte object <DC-4E 35-77 00-00 00-00> vs 8-byte object <02-00 00-00 00-00 00-00>
C:\Users\tcwg\llvm-worker\lldb-aarch64-windows\llvm-project\lldb\unittests\Host\MainLoopTest.cpp:229
Expected: (std::chrono::steady_clock::now() - start) >= (std::chrono::seconds(2)), actual: 8-byte object <DC-4E 35-77 00-00 00-00> vs 8-byte object <02-00 00-00 00-00 00-00>
********************
If anything I would think our bot might take way longer than 2 seconds but this test would pass if that were the case. The expected object is clearly looks like 2 seconds but the one we got I've no idea of the layout.
I don't see it failing in any other build and it could be some bug in MSVC's STL, but letting you know in case you can spot something obvious in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or on Windows the definition of 2 seconds is a little more fuzzy than we'd like it to be :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The representation is "an integer" but to interpret it, you need to know the resolution of the steady_clock on windows (arm). That appears to be nanoseconds, which means this value is 0x77354edc/1e9 = 1.9999823000000001
seconds, which means that the callback runs sooner than it should have. I think I know the reason. Lemme whip up a patch real quick.
The motivating use case is being able to "time out" certain operations (by adding a timed callback which will force the termination of the loop), but the design is flexible enough to accomodate other use cases as well (e.g. running a periodic task in the background).
The implementation builds on the existing "pending callback" mechanism, by associating a time point with each callback -- every time the loop wakes up, it runs all of the callbacks which are past their point, and it also makes sure to sleep only until the next callback is scheduled to run.
I've done some renaming as names like "TriggerPendingCallbacks" were no longer accurate -- the function may no longer cause any callbacks to be called (it may just cause the main loop thread to recalculate the time it wants to sleep).