Skip to content

Commit 1a5d3b6

Browse files
authored
[libc] Scan the ports more fairly in the RPC server (llvm#66680)
Summary: Currently, we use the RPC server to respond to different ports which each contain a request from some client thread wishing to do work on the server. This scan starts at zero and continues until its checked all ports at which point it resets. If we find an active port, we service it and then restart the search. This is bad for two reasons. First, it means that we will always bias the lower ports. If a thread grabs a high port it will be stuck for a very long time until all the other work is done. Second, it means that the `handle_server` function can technically run indefinitely as long as the client is always pushing new work. Because the OpenMP implementation uses the user thread to service the kernel, this means that it could be stalled with another asyncrhonous device's kernels. This patch addresses this by making the server restart at the next port over. This means we will always do a full scan of the ports before quitting.
1 parent 0f88be7 commit 1a5d3b6

File tree

2 files changed

+17
-9
lines changed

2 files changed

+17
-9
lines changed

libc/src/__support/RPC/rpc.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ template <bool T, typename S> struct Port {
318318
return process.packet[index].header.opcode;
319319
}
320320

321+
LIBC_INLINE uint16_t get_index() const { return index; }
322+
321323
LIBC_INLINE void close() {
322324
// The server is passive, if it own the buffer when it closes we need to
323325
// give ownership back to the client.
@@ -367,7 +369,7 @@ template <uint32_t lane_size> struct Server {
367369
: process(port_count, buffer) {}
368370

369371
using Port = rpc::Port<true, Packet<lane_size>>;
370-
LIBC_INLINE cpp::optional<Port> try_open();
372+
LIBC_INLINE cpp::optional<Port> try_open(uint32_t start = 0);
371373
LIBC_INLINE Port open();
372374

373375
LIBC_INLINE static uint64_t allocation_size(uint32_t port_count) {
@@ -547,9 +549,9 @@ template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
547549
template <uint32_t lane_size>
548550
[[clang::convergent]] LIBC_INLINE
549551
cpp::optional<typename Server<lane_size>::Port>
550-
Server<lane_size>::try_open() {
552+
Server<lane_size>::try_open(uint32_t start) {
551553
// Perform a naive linear scan for a port that has a pending request.
552-
for (uint32_t index = 0; index < process.port_count; ++index) {
554+
for (uint32_t index = start; index < process.port_count; ++index) {
553555
uint64_t lane_mask = gpu::get_lane_mask();
554556
uint32_t in = process.load_inbox(lane_mask, index);
555557
uint32_t out = process.load_outbox(lane_mask, index);

libc/utils/gpu/server/rpc_server.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ struct Server {
3636

3737
rpc_status_t handle_server(
3838
const std::unordered_map<rpc_opcode_t, rpc_opcode_callback_ty> &callbacks,
39-
const std::unordered_map<rpc_opcode_t, void *> &callback_data) {
39+
const std::unordered_map<rpc_opcode_t, void *> &callback_data,
40+
uint32_t &index) {
4041
rpc_status_t ret = RPC_STATUS_SUCCESS;
4142
std::visit(
4243
[&](auto &server) {
43-
ret = handle_server(*server, callbacks, callback_data);
44+
ret = handle_server(*server, callbacks, callback_data, index);
4445
},
4546
server);
4647
return ret;
@@ -51,8 +52,9 @@ struct Server {
5152
rpc_status_t handle_server(
5253
rpc::Server<lane_size> &server,
5354
const std::unordered_map<rpc_opcode_t, rpc_opcode_callback_ty> &callbacks,
54-
const std::unordered_map<rpc_opcode_t, void *> &callback_data) {
55-
auto port = server.try_open();
55+
const std::unordered_map<rpc_opcode_t, void *> &callback_data,
56+
uint32_t &index) {
57+
auto port = server.try_open(index);
5658
if (!port)
5759
return RPC_STATUS_SUCCESS;
5860

@@ -203,6 +205,9 @@ struct Server {
203205
(handler->second)(port_ref, data);
204206
}
205207
}
208+
209+
// Increment the index so we start the scan after this port.
210+
index = port->get_index() + 1;
206211
port->close();
207212
return RPC_STATUS_CONTINUE;
208213
}
@@ -333,10 +338,11 @@ rpc_status_t rpc_handle_server(uint32_t device_id) {
333338
if (!state->devices[device_id])
334339
return RPC_STATUS_ERROR;
335340

341+
uint32_t index = 0;
336342
for (;;) {
337343
auto &device = *state->devices[device_id];
338-
rpc_status_t status =
339-
device.server.handle_server(device.callbacks, device.callback_data);
344+
rpc_status_t status = device.server.handle_server(
345+
device.callbacks, device.callback_data, index);
340346
if (status != RPC_STATUS_CONTINUE)
341347
return status;
342348
}

0 commit comments

Comments
 (0)