Skip to content

[lldb] Support non-blocking reads in JSONRPCTransport #144610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

JDevlieghere
Copy link
Member

@JDevlieghere JDevlieghere commented Jun 17, 2025

Support non-blocking reads for JSONRPCTransport so we can implement a multiplexed reader using the MainLoop. Pavel pointed out in #143628 that the implementation there (which was using blocking reads) can easily result in reading partial JSON RPC packets.

Support non-blocking reads for JSONRPCTransport so we can implement a
multiplexed reader using the MainLoop. Pavel pointed out in llvm#143628 that
the implementation there (which was using blocking reads) can easily to
reading partial JSON RPC packets.
@llvmbot
Copy link
Member

llvmbot commented Jun 17, 2025

@llvm/pr-subscribers-lldb

Author: Jonas Devlieghere (JDevlieghere)

Changes

Support non-blocking reads for JSONRPCTransport so we can implement a multiplexed reader using the MainLoop. Pavel pointed out in #143628 that the implementation there (which was using blocking reads) can easily to reading partial JSON RPC packets.


Full diff: https://github.com/llvm/llvm-project/pull/144610.diff

3 Files Affected:

  • (modified) lldb/include/lldb/Host/JSONTransport.h (+15-4)
  • (modified) lldb/source/Host/common/JSONTransport.cpp (+25-17)
  • (modified) lldb/unittests/Host/JSONTransportTest.cpp (+39-4)
diff --git a/lldb/include/lldb/Host/JSONTransport.h b/lldb/include/lldb/Host/JSONTransport.h
index 4087cdf2b42f7..36a67c929a1c6 100644
--- a/lldb/include/lldb/Host/JSONTransport.h
+++ b/lldb/include/lldb/Host/JSONTransport.h
@@ -85,7 +85,8 @@ class JSONTransport {
 
   /// Reads the next message from the input stream.
   template <typename T>
-  llvm::Expected<T> Read(const std::chrono::microseconds &timeout) {
+  llvm::Expected<T>
+  Read(std::optional<std::chrono::microseconds> timeout = std::nullopt) {
     llvm::Expected<std::string> message = ReadImpl(timeout);
     if (!message)
       return message.takeError();
@@ -97,10 +98,20 @@ class JSONTransport {
 
   virtual llvm::Error WriteImpl(const std::string &message) = 0;
   virtual llvm::Expected<std::string>
-  ReadImpl(const std::chrono::microseconds &timeout) = 0;
+  ReadImpl(std::optional<std::chrono::microseconds> timeout) = 0;
+
+  llvm::Expected<std::string>
+  ReadFull(IOObject &descriptor, size_t length,
+           std::optional<std::chrono::microseconds> timeout) const;
+
+  llvm::Expected<std::string>
+  ReadUntil(IOObject &descriptor, llvm::StringRef delimiter,
+            std::optional<std::chrono::microseconds> timeout);
 
   lldb::IOObjectSP m_input;
   lldb::IOObjectSP m_output;
+
+  std::string m_buffer;
 };
 
 /// A transport class for JSON with a HTTP header.
@@ -113,7 +124,7 @@ class HTTPDelimitedJSONTransport : public JSONTransport {
 protected:
   virtual llvm::Error WriteImpl(const std::string &message) override;
   virtual llvm::Expected<std::string>
-  ReadImpl(const std::chrono::microseconds &timeout) override;
+  ReadImpl(std::optional<std::chrono::microseconds> timeout) override;
 
   // FIXME: Support any header.
   static constexpr llvm::StringLiteral kHeaderContentLength =
@@ -131,7 +142,7 @@ class JSONRPCTransport : public JSONTransport {
 protected:
   virtual llvm::Error WriteImpl(const std::string &message) override;
   virtual llvm::Expected<std::string>
-  ReadImpl(const std::chrono::microseconds &timeout) override;
+  ReadImpl(std::optional<std::chrono::microseconds> timeout) override;
 
   static constexpr llvm::StringLiteral kMessageSeparator = "\n";
 };
diff --git a/lldb/source/Host/common/JSONTransport.cpp b/lldb/source/Host/common/JSONTransport.cpp
index 1a0851d5c4365..0fae74fb87b68 100644
--- a/lldb/source/Host/common/JSONTransport.cpp
+++ b/lldb/source/Host/common/JSONTransport.cpp
@@ -27,9 +27,9 @@ using namespace lldb_private;
 
 /// ReadFull attempts to read the specified number of bytes. If EOF is
 /// encountered, an empty string is returned.
-static Expected<std::string>
-ReadFull(IOObject &descriptor, size_t length,
-         std::optional<std::chrono::microseconds> timeout = std::nullopt) {
+Expected<std::string> JSONTransport::ReadFull(
+    IOObject &descriptor, size_t length,
+    std::optional<std::chrono::microseconds> timeout) const {
   if (!descriptor.IsValid())
     return llvm::make_error<TransportInvalidError>();
 
@@ -67,19 +67,22 @@ ReadFull(IOObject &descriptor, size_t length,
   return data.substr(0, length);
 }
 
-static Expected<std::string>
-ReadUntil(IOObject &descriptor, StringRef delimiter,
-          std::optional<std::chrono::microseconds> timeout = std::nullopt) {
-  std::string buffer;
-  buffer.reserve(delimiter.size() + 1);
-  while (!llvm::StringRef(buffer).ends_with(delimiter)) {
+Expected<std::string>
+JSONTransport::ReadUntil(IOObject &descriptor, StringRef delimiter,
+                         std::optional<std::chrono::microseconds> timeout) {
+  if (!timeout || *timeout != std::chrono::microseconds::zero()) {
+    m_buffer.clear();
+    m_buffer.reserve(delimiter.size() + 1);
+  }
+
+  while (!llvm::StringRef(m_buffer).ends_with(delimiter)) {
     Expected<std::string> next =
-        ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout);
+        ReadFull(descriptor, m_buffer.empty() ? delimiter.size() : 1, timeout);
     if (auto Err = next.takeError())
       return std::move(Err);
-    buffer += *next;
+    m_buffer += *next;
   }
-  return buffer.substr(0, buffer.size() - delimiter.size());
+  return m_buffer.substr(0, m_buffer.size() - delimiter.size());
 }
 
 JSONTransport::JSONTransport(IOObjectSP input, IOObjectSP output)
@@ -89,11 +92,15 @@ void JSONTransport::Log(llvm::StringRef message) {
   LLDB_LOG(GetLog(LLDBLog::Host), "{0}", message);
 }
 
-Expected<std::string>
-HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
+Expected<std::string> HTTPDelimitedJSONTransport::ReadImpl(
+    std::optional<std::chrono::microseconds> timeout) {
   if (!m_input || !m_input->IsValid())
     return llvm::make_error<TransportInvalidError>();
 
+  if (timeout && *timeout == std::chrono::microseconds::zero())
+    return llvm::createStringError(
+        "HTTPDelimitedJSONTransport does not support non-blocking reads");
+
   IOObject *input = m_input.get();
   Expected<std::string> message_header =
       ReadFull(*input, kHeaderContentLength.size(), timeout);
@@ -104,7 +111,8 @@ HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
                                      kHeaderContentLength, *message_header)
                                  .str());
 
-  Expected<std::string> raw_length = ReadUntil(*input, kHeaderSeparator);
+  Expected<std::string> raw_length =
+      ReadUntil(*input, kHeaderSeparator, timeout);
   if (!raw_length)
     return handleErrors(raw_length.takeError(),
                         [&](const TransportEOFError &E) -> llvm::Error {
@@ -117,7 +125,7 @@ HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
     return createStringError(
         formatv("invalid content length {0}", *raw_length).str());
 
-  Expected<std::string> raw_json = ReadFull(*input, length);
+  Expected<std::string> raw_json = ReadFull(*input, length, timeout);
   if (!raw_json)
     return handleErrors(
         raw_json.takeError(), [&](const TransportEOFError &E) -> llvm::Error {
@@ -143,7 +151,7 @@ Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) {
 }
 
 Expected<std::string>
-JSONRPCTransport::ReadImpl(const std::chrono::microseconds &timeout) {
+JSONRPCTransport::ReadImpl(std::optional<std::chrono::microseconds> timeout) {
   if (!m_input || !m_input->IsValid())
     return make_error<TransportInvalidError>();
 
diff --git a/lldb/unittests/Host/JSONTransportTest.cpp b/lldb/unittests/Host/JSONTransportTest.cpp
index 4621869887ac8..cc43d7d851cb1 100644
--- a/lldb/unittests/Host/JSONTransportTest.cpp
+++ b/lldb/unittests/Host/JSONTransportTest.cpp
@@ -16,7 +16,7 @@ using namespace lldb_private;
 namespace {
 template <typename T> class JSONTransportTest : public PipeTest {
 protected:
-  std::unique_ptr<JSONTransport> transport;
+  std::unique_ptr<T> transport;
 
   void SetUp() override {
     PipeTest::SetUp();
@@ -36,7 +36,13 @@ class HTTPDelimitedJSONTransportTest
   using JSONTransportTest::JSONTransportTest;
 };
 
-class JSONRPCTransportTest : public JSONTransportTest<JSONRPCTransport> {
+class TestJSONRPCTransport : public JSONRPCTransport {
+public:
+  using JSONRPCTransport::JSONRPCTransport;
+  using JSONRPCTransport::WriteImpl; // For partial writes.
+};
+
+class JSONRPCTransportTest : public JSONTransportTest<TestJSONRPCTransport> {
 public:
   using JSONTransportTest::JSONTransportTest;
 };
@@ -84,7 +90,6 @@ TEST_F(HTTPDelimitedJSONTransportTest, ReadWithEOF) {
       Failed<TransportEOFError>());
 }
 
-
 TEST_F(HTTPDelimitedJSONTransportTest, InvalidTransport) {
   transport = std::make_unique<HTTPDelimitedJSONTransport>(nullptr, nullptr);
   ASSERT_THAT_EXPECTED(
@@ -142,13 +147,43 @@ TEST_F(JSONRPCTransportTest, Write) {
 }
 
 TEST_F(JSONRPCTransportTest, InvalidTransport) {
-  transport = std::make_unique<JSONRPCTransport>(nullptr, nullptr);
+  transport = std::make_unique<TestJSONRPCTransport>(nullptr, nullptr);
   ASSERT_THAT_EXPECTED(
       transport->Read<JSONTestType>(std::chrono::milliseconds(1)),
       Failed<TransportInvalidError>());
 }
 
 #ifndef _WIN32
+TEST_F(HTTPDelimitedJSONTransportTest, NonBlockingRead) {
+  ASSERT_THAT_EXPECTED(
+      transport->Read<JSONTestType>(std::chrono::microseconds::zero()),
+      llvm::FailedWithMessage(
+          "HTTPDelimitedJSONTransport does not support non-blocking reads"));
+}
+
+TEST_F(JSONRPCTransportTest, NonBlockingRead) {
+  llvm::StringRef head = R"({"str")";
+  llvm::StringRef tail = R"(: "foo"})"
+                         "\n";
+
+  ASSERT_THAT_EXPECTED(input.Write(head.data(), head.size()), Succeeded());
+  ASSERT_THAT_EXPECTED(
+      transport->Read<JSONTestType>(std::chrono::microseconds::zero()),
+      Failed<TransportTimeoutError>());
+
+  ASSERT_THAT_EXPECTED(input.Write(tail.data(), tail.size()), Succeeded());
+  while (true) {
+    llvm::Expected<JSONTestType> result =
+        transport->Read<JSONTestType>(std::chrono::microseconds::zero());
+    if (result.errorIsA<TransportTimeoutError>()) {
+      llvm::consumeError(result.takeError());
+      continue;
+    }
+    ASSERT_THAT_EXPECTED(result, HasValue(testing::FieldsAre(/*str=*/"foo")));
+    break;
+  }
+}
+
 TEST_F(HTTPDelimitedJSONTransportTest, ReadWithTimeout) {
   ASSERT_THAT_EXPECTED(
       transport->Read<JSONTestType>(std::chrono::milliseconds(1)),

Comment on lines +73 to +80
if (!timeout || *timeout != std::chrono::microseconds::zero()) {
m_buffer.clear();
m_buffer.reserve(delimiter.size() + 1);
}

while (!llvm::StringRef(m_buffer).ends_with(delimiter)) {
Expected<std::string> next =
ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout);
ReadFull(descriptor, m_buffer.empty() ? delimiter.size() : 1, timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a buffer, we could adjust our approach to read in larger than 1 byte chunks when we're reading until a delimiter.

We could read chunks of say 1024 and then split the buffer on the delimited until we run out of data and then do a new read with the next chunk size.

I don't know if this approach would have issues on windows or anything though, so maybe someone with more platform specific knowledge may know how it handles blocking reads if _read is called with no data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't understand the question. A blocking read with no data... blocks, sort of by definition.

transport->Read<JSONTestType>(std::chrono::microseconds::zero()),
Failed<TransportTimeoutError>());

ASSERT_THAT_EXPECTED(input.Write(tail.data(), tail.size()), Succeeded());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test a chunk with the delimiter in the middle of the data?

@@ -85,7 +85,8 @@ class JSONTransport {

/// Reads the next message from the input stream.
template <typename T>
llvm::Expected<T> Read(const std::chrono::microseconds &timeout) {
llvm::Expected<T>
Read(std::optional<std::chrono::microseconds> timeout = std::nullopt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this type has moved into lldb_private now, should we use the Timeout helper? https://github.com/llvm/llvm-project/blob/main/lldb/include/lldb/Utility/Timeout.h#L28

Copy link
Collaborator

@labath labath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling that this generic JSONTransport interface is doing more harm than good. I mean, the idea of being able to send/receive a json-encoded message generically is a good one, but given that:
a) we're not actually supporting non-blocking reads in the http version, meaning the two aren't interchangable;
b) noone needs to access these transports generically (or does it?);
I don't think it's necessary or helpful.

Blocking and non-blocking I/O tends to look very different (esp. if you don't have coroutines). While you can create something that supports both paradigms, doing that while also supporting the un-selectable, un-nonblockable windows corner cases, is not easy, and would probably result in a lot more code than it would save.

I mean, if all I needed to support is mainloop-based jsonrpc reads from sockets (which I think is all you need for your use case), the entire implementation would roughly be this:

void DataAvailableCallback() { // called from main loop
  char chunk[1000];
  size_t bytes_read = sizeof(chunk);
  if (Status status = input->Read(chunk, bytes_read); status.Fail()) // no need for select(helpers), main loop did that for us
    // do something
  m_buffer.append(chunk, bytes_read);
  for (std::string::size_type pos; (pos = m_buffer.find('\n')) != std::string::npos; ) {
    HandleMessage(StringRef(m_buffer.data(), pos));
    m_buffer.erase(m_buffer.begin(), m_buffer.begin()+pos+1);
  }
}

Expected<std::string>
JSONTransport::ReadUntil(IOObject &descriptor, StringRef delimiter,
std::optional<std::chrono::microseconds> timeout) {
if (!timeout || *timeout != std::chrono::microseconds::zero()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this depend on the timeout value? If you previously timed out reading something (with a non-zero timeout) and you try to read again, who says the result is going to be better if you discard the data you've read so far?

if (!m_input || !m_input->IsValid())
return llvm::make_error<TransportInvalidError>();

if (timeout && *timeout == std::chrono::microseconds::zero())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (timeout && *timeout == std::chrono::microseconds::zero())
if (timeout == std::chrono::microseconds::zero())

this should also work

static Expected<std::string>
ReadFull(IOObject &descriptor, size_t length,
std::optional<std::chrono::microseconds> timeout = std::nullopt) {
Expected<std::string> JSONTransport::ReadFull(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep this a static, so that it's clear it does not interact with m_buffer?

Comment on lines +73 to +80
if (!timeout || *timeout != std::chrono::microseconds::zero()) {
m_buffer.clear();
m_buffer.reserve(delimiter.size() + 1);
}

while (!llvm::StringRef(m_buffer).ends_with(delimiter)) {
Expected<std::string> next =
ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout);
ReadFull(descriptor, m_buffer.empty() ? delimiter.size() : 1, timeout);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't understand the question. A blocking read with no data... blocks, sort of by definition.

llvm::Expected<std::string>
ReadFull(IOObject &descriptor, size_t length,
std::optional<std::chrono::microseconds> timeout) const;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if these are members, then they probably don't need to take the IOObject arg.

@JDevlieghere
Copy link
Member Author

I have a feeling that this generic JSONTransport interface is doing more harm than good. I mean, the idea of being able to send/receive a json-encoded message generically is a good one, but given that:
a) we're not actually supporting non-blocking reads in the http version, meaning the two aren't interchangable;
b) noone needs to access these transports generically (or does it?);
I don't think it's necessary or helpful.

That's a fair observation. I started out this patch by having a non-blocking variant, but I figured with a zero timeout we could keep the existing interface. My patch doesn't support non-blocking reads for the HTTP version, but that doesn't mean we couldn't support it if we wanted to.

Blocking and non-blocking I/O tends to look very different (esp. if you don't have coroutines). While you can create something that supports both paradigms, doing that while also supporting the un-selectable, un-nonblockable windows corner cases, is not easy, and would probably result in a lot more code than it would save.

I can't say I'm particularly interested/motivated to support the Windows scenario.

So it seems like our options are a matrix of:

  1. Do we keep the JSONTransport class or not.
  2. Do we implement non-blocking I/O support for JSON RPC.
  3. Do we implement non-blocking I/O support for HTTP JSON.

@labath @ashgti I'd love your guy's input here. I'm just trying to make the MCP use case work so I don't really have a horse in this race, though I don't mind doing a bit more work for what we believe is right.

@ashgti Do you think we would adopt non-blocking I/O in lldb-dap? Since we're using separate threads there, we can afford using long timeouts, but maybe we'd like to use the main loop there too with non-blocking I/O. If not, and if I were to implement the non-blocking support in MCP directly using Pavel's snippet, would you want to move the JSONTransport back into lldb-dap (and drop the JSON RPC variant) or keep it in Host?

@labath
Copy link
Collaborator

labath commented Jun 18, 2025

So it seems like our options are a matrix of:

1. Do we keep the JSONTransport class or not.

2. Do we implement non-blocking I/O support for JSON RPC.

3. Do we implement non-blocking I/O support for HTTP JSON.

@labath @ashgti I'd love your guy's input here.

My take would be this.

  1. Ditch JSONTransport. Implement something that makes sense for your use case.
  2. Implement non-blocking I/O as the only method. If you need to be able to interrupt/terminate/cancel a connection, you need some sort of a multiplexing mechanism, and that's sort of equivalent to non-blocking.
  3. This becomes Not Your Problem(tm).

Generally, I think HTTP JSON should also switch to something similar to this -- because it also has the same problem that started all this, but it has the problem of communicating over stdout. I'm not sure how it gets away with that right now, maybe it works because stdout is only used in the single-connection scenario. If that is resolved, maybe it could be ported to share some of the JSON RPC infrastructure (but i'm not sure if that's ever really necessary, just because how simple this stuff is).

If you little bit more work you could try to make lldb-dap communicate over sockets exclusively (I've been told that the communication mechanism is entirely under our control), and then it may make sense to share more of this stuff, but I wouldn't say this needs to be tied together.

@ashgti
Copy link
Contributor

ashgti commented Jun 18, 2025

@ashgti Do you think we would adopt non-blocking I/O in lldb-dap? Since we're using separate threads there, we can afford using long timeouts, but maybe we'd like to use the main loop there too with non-blocking I/O. If not, and if I were to implement the non-blocking support in MCP directly using Pavel's snippet, would you want to move the JSONTransport back into lldb-dap (and drop the JSON RPC variant) or keep it in Host?

lldb-dap today is most often used by communicating over stdin/stdout.

When adding support for lldb-dap to have a server mode, the clients connect over a socket (unix or tcp). I wanted to be able to gracefully disconnect the clients if the server is interrupted. We don't strictly need to have non-blocking IO for lldb-dap to work in both cases, but it helps for the graceful disconnects. Right now, we sort of ignore the fact that the Transport never times out on Windows when running over stdin/stdout.

In an ideal world we'd have uniform support for this, but from my understanding of Win32 and the APIs we're using, I wasn't able to figure out a solution for reading from stdin with a timeout.

There are ways we could work around this, like always connecting over a socket on Windows instead of stdin/stdout but that would be a pretty big change and I'm not sure how many users there are of the lldb-dap binary on Windows that would be affected by the change.

I can try to take another look at the Win32 APIs to see if there is an alternative because it would be nice if we had a uniform API surface for this.

@JDevlieghere
Copy link
Member Author

Alright, I'll update my MCP patch to use Pavel's suggestion. If we're able to get non-blocking I/O working with the JSONTransport class, I can adopt it there again.

@labath
Copy link
Collaborator

labath commented Jun 19, 2025

In an ideal world we'd have uniform support for this, but from my understanding of Win32 and the APIs we're using, I wasn't able to figure out a solution for reading from stdin with a timeout.

I think the question is, what is the actual type of stdin in this case. Windows APIs are more segregated than posix ones, so a fully generic solution for reading from stdin may not be possible. However, it's definitely possible to implement a nonblocking read from a (named) pipe, if that is what vscode uses for stdin communication. I think we should be able to check for that via something like GetFileType(GetStdHandle(STD_OUTPUT_HANDLE)) and then constructing a Pipe object from that handle.

Our Pipe class already supports non-blocking (timeout-based) reads, which is already a step up. However, it's not selectable, so you can't plug it into the main loop. In one of the first patches, you linked to some code (in libdispatch maybe?), which seems to use a zero-length read as a way to poll for the data in the pipe without actually consuming it. I've been meaning to try that out, but never could get around to it. If that works, then it should be relatively easy to plug that into the windows main loop implementation -- just take the hEvent from the zero-length read, and plug that into the WaitForMultipleEvents call.

@ashgti
Copy link
Contributor

ashgti commented Jun 20, 2025

Okay, I took a look at this and got the following working: https://gist.github.com/ashgti/003ada30ee59aed154bac111581f97d5 this is a select like helper that only works on Pipe handle types. From my testing, it should work with both anonymous pipes and named pipes. I can look at incorporating this into the IOObject types.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants