-
Notifications
You must be signed in to change notification settings - Fork 14.3k
[Offload] Move RPC server handling to a dedicated thread #112988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@llvm/pr-subscribers-offload @llvm/pr-subscribers-backend-amdgpu Author: Joseph Huber (jhuber6) ChangesSummary: However, we also don't want to have this thread doing work Patch is 20.67 KiB, truncated to 20.00 KiB below, full version: https://github.com/llvm/llvm-project/pull/112988.diff 7 Files Affected:
diff --git a/offload/plugins-nextgen/amdgpu/src/rtl.cpp b/offload/plugins-nextgen/amdgpu/src/rtl.cpp
index f0cc0c2e4d08e5..bd1d60075bb36a 100644
--- a/offload/plugins-nextgen/amdgpu/src/rtl.cpp
+++ b/offload/plugins-nextgen/amdgpu/src/rtl.cpp
@@ -626,9 +626,9 @@ struct AMDGPUSignalTy {
}
/// Wait until the signal gets a zero value.
- Error wait(const uint64_t ActiveTimeout = 0, RPCServerTy *RPCServer = nullptr,
+ Error wait(const uint64_t ActiveTimeout = 0,
GenericDeviceTy *Device = nullptr) const {
- if (ActiveTimeout && !RPCServer) {
+ if (ActiveTimeout) {
hsa_signal_value_t Got = 1;
Got = hsa_signal_wait_scacquire(HSASignal, HSA_SIGNAL_CONDITION_EQ, 0,
ActiveTimeout, HSA_WAIT_STATE_ACTIVE);
@@ -637,14 +637,11 @@ struct AMDGPUSignalTy {
}
// If there is an RPC device attached to this stream we run it as a server.
- uint64_t Timeout = RPCServer ? 8192 : UINT64_MAX;
- auto WaitState = RPCServer ? HSA_WAIT_STATE_ACTIVE : HSA_WAIT_STATE_BLOCKED;
+ uint64_t Timeout = UINT64_MAX;
+ auto WaitState = HSA_WAIT_STATE_BLOCKED;
while (hsa_signal_wait_scacquire(HSASignal, HSA_SIGNAL_CONDITION_EQ, 0,
- Timeout, WaitState) != 0) {
- if (RPCServer && Device)
- if (auto Err = RPCServer->runServer(*Device))
- return Err;
- }
+ Timeout, WaitState) != 0)
+ ;
return Plugin::success();
}
@@ -927,6 +924,8 @@ struct AMDGPUStreamTy {
AMDGPUSignalManagerTy *SignalManager;
};
+ using AMDGPUStreamCallbackTy = Error(void *Data);
+
/// The stream is composed of N stream's slots. The struct below represents
/// the fields of each slot. Each slot has a signal and an optional action
/// function. When appending an HSA asynchronous operation to the stream, one
@@ -942,65 +941,81 @@ struct AMDGPUStreamTy {
/// operation as input signal.
AMDGPUSignalTy *Signal;
- /// The action that must be performed after the operation's completion. Set
+ /// The actions that must be performed after the operation's completion. Set
/// to nullptr when there is no action to perform.
- Error (*ActionFunction)(void *);
+ llvm::SmallVector<AMDGPUStreamCallbackTy *> Callbacks;
/// Space for the action's arguments. A pointer to these arguments is passed
/// to the action function. Notice the space of arguments is limited.
- union {
+ union ActionArgsTy {
MemcpyArgsTy MemcpyArgs;
ReleaseBufferArgsTy ReleaseBufferArgs;
ReleaseSignalArgsTy ReleaseSignalArgs;
- } ActionArgs;
+ void *CallbackArgs;
+ };
+
+ llvm::SmallVector<ActionArgsTy> ActionArgs;
/// Create an empty slot.
- StreamSlotTy() : Signal(nullptr), ActionFunction(nullptr) {}
+ StreamSlotTy() : Signal(nullptr), Callbacks({}), ActionArgs({}) {}
/// Schedule a host memory copy action on the slot.
Error schedHostMemoryCopy(void *Dst, const void *Src, size_t Size) {
- ActionFunction = memcpyAction;
- ActionArgs.MemcpyArgs = MemcpyArgsTy{Dst, Src, Size};
+ Callbacks.emplace_back(memcpyAction);
+ ActionArgs.emplace_back().MemcpyArgs = MemcpyArgsTy{Dst, Src, Size};
return Plugin::success();
}
/// Schedule a release buffer action on the slot.
Error schedReleaseBuffer(void *Buffer, AMDGPUMemoryManagerTy &Manager) {
- ActionFunction = releaseBufferAction;
- ActionArgs.ReleaseBufferArgs = ReleaseBufferArgsTy{Buffer, &Manager};
+ Callbacks.emplace_back(releaseBufferAction);
+ ActionArgs.emplace_back().ReleaseBufferArgs =
+ ReleaseBufferArgsTy{Buffer, &Manager};
return Plugin::success();
}
/// Schedule a signal release action on the slot.
Error schedReleaseSignal(AMDGPUSignalTy *SignalToRelease,
AMDGPUSignalManagerTy *SignalManager) {
- ActionFunction = releaseSignalAction;
- ActionArgs.ReleaseSignalArgs =
+ Callbacks.emplace_back(releaseSignalAction);
+ ActionArgs.emplace_back().ReleaseSignalArgs =
ReleaseSignalArgsTy{SignalToRelease, SignalManager};
return Plugin::success();
}
+ /// Register a callback to be called on compleition
+ Error schedCallback(AMDGPUStreamCallbackTy *Func, void *Data) {
+ Callbacks.emplace_back(Func);
+ ActionArgs.emplace_back().CallbackArgs = Data;
+
+ return Plugin::success();
+ }
+
// Perform the action if needed.
Error performAction() {
- if (!ActionFunction)
+ if (Callbacks.empty())
return Plugin::success();
- // Perform the action.
- if (ActionFunction == memcpyAction) {
- if (auto Err = memcpyAction(&ActionArgs))
- return Err;
- } else if (ActionFunction == releaseBufferAction) {
- if (auto Err = releaseBufferAction(&ActionArgs))
- return Err;
- } else if (ActionFunction == releaseSignalAction) {
- if (auto Err = releaseSignalAction(&ActionArgs))
- return Err;
- } else {
- return Plugin::error("Unknown action function!");
+ for (auto [Callback, ActionArg] : llvm::zip(Callbacks, ActionArgs)) {
+ // Perform the action.
+ if (Callback == memcpyAction) {
+ if (auto Err = memcpyAction(&ActionArg))
+ return Err;
+ } else if (Callback == releaseBufferAction) {
+ if (auto Err = releaseBufferAction(&ActionArg))
+ return Err;
+ } else if (Callback == releaseSignalAction) {
+ if (auto Err = releaseSignalAction(&ActionArg))
+ return Err;
+ } else {
+ if (auto Err = Callback(ActionArg.CallbackArgs))
+ return Err;
+ }
}
// Invalidate the action.
- ActionFunction = nullptr;
+ Callbacks.clear();
+ ActionArgs.clear();
return Plugin::success();
}
@@ -1034,11 +1049,6 @@ struct AMDGPUStreamTy {
/// operation that was already finalized in a previous stream sycnhronize.
uint32_t SyncCycle;
- /// A pointer associated with an RPC server running on the given device. If
- /// RPC is not being used this will be a null pointer. Otherwise, this
- /// indicates that an RPC server is expected to be run on this stream.
- RPCServerTy *RPCServer;
-
/// Mutex to protect stream's management.
mutable std::mutex Mutex;
@@ -1218,9 +1228,6 @@ struct AMDGPUStreamTy {
/// Deinitialize the stream's signals.
Error deinit() { return Plugin::success(); }
- /// Attach an RPC server to this stream.
- void setRPCServer(RPCServerTy *Server) { RPCServer = Server; }
-
/// Push a asynchronous kernel to the stream. The kernel arguments must be
/// placed in a special allocation for kernel args and must keep alive until
/// the kernel finalizes. Once the kernel is finished, the stream will release
@@ -1248,10 +1255,30 @@ struct AMDGPUStreamTy {
if (auto Err = Slots[Curr].schedReleaseBuffer(KernelArgs, MemoryManager))
return Err;
+ // If we are running an RPC server we want to wake up the server thread
+ // whenever there is a kernel running and let it sleep otherwise.
+ if (Device.getRPCServer())
+ Device.Plugin.getRPCServer().Thread->notify();
+
// Push the kernel with the output signal and an input signal (optional)
- return Queue->pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks,
- GroupSize, StackSize, OutputSignal,
- InputSignal);
+ if (auto Err = Queue->pushKernelLaunch(Kernel, KernelArgs, NumThreads,
+ NumBlocks, GroupSize, StackSize,
+ OutputSignal, InputSignal))
+ return Err;
+
+ // Register a callback to indicate when the kernel is complete.
+ if (Device.getRPCServer()) {
+ if (auto Err = Slots[Curr].schedCallback(
+ [](void *Data) -> llvm::Error {
+ GenericPluginTy &Plugin =
+ *reinterpret_cast<GenericPluginTy *>(Data);
+ Plugin.getRPCServer().Thread->finish();
+ return Error::success();
+ },
+ &Device.Plugin))
+ return Err;
+ }
+ return Plugin::success();
}
/// Push an asynchronous memory copy between pinned memory buffers.
@@ -1461,8 +1488,8 @@ struct AMDGPUStreamTy {
return Plugin::success();
// Wait until all previous operations on the stream have completed.
- if (auto Err = Slots[last()].Signal->wait(StreamBusyWaitMicroseconds,
- RPCServer, &Device))
+ if (auto Err =
+ Slots[last()].Signal->wait(StreamBusyWaitMicroseconds, &Device))
return Err;
// Reset the stream and perform all pending post actions.
@@ -3006,7 +3033,7 @@ AMDGPUStreamTy::AMDGPUStreamTy(AMDGPUDeviceTy &Device)
: Agent(Device.getAgent()), Queue(nullptr),
SignalManager(Device.getSignalManager()), Device(Device),
// Initialize the std::deque with some empty positions.
- Slots(32), NextSlot(0), SyncCycle(0), RPCServer(nullptr),
+ Slots(32), NextSlot(0), SyncCycle(0),
StreamBusyWaitMicroseconds(Device.getStreamBusyWaitMicroseconds()),
UseMultipleSdmaEngines(Device.useMultipleSdmaEngines()) {}
@@ -3359,10 +3386,6 @@ Error AMDGPUKernelTy::launchImpl(GenericDeviceTy &GenericDevice,
if (auto Err = AMDGPUDevice.getStream(AsyncInfoWrapper, Stream))
return Err;
- // If this kernel requires an RPC server we attach its pointer to the stream.
- if (GenericDevice.getRPCServer())
- Stream->setRPCServer(GenericDevice.getRPCServer());
-
// Only COV5 implicitargs needs to be set. COV4 implicitargs are not used.
if (ImplArgs &&
getImplicitArgsSize() == sizeof(hsa_utils::AMDGPUImplicitArgsTy)) {
diff --git a/offload/plugins-nextgen/common/include/RPC.h b/offload/plugins-nextgen/common/include/RPC.h
index 01bf539bcb3f32..9b7ebee4bdb785 100644
--- a/offload/plugins-nextgen/common/include/RPC.h
+++ b/offload/plugins-nextgen/common/include/RPC.h
@@ -19,7 +19,11 @@
#include "llvm/ADT/DenseMap.h"
#include "llvm/Support/Error.h"
+#include <atomic>
+#include <condition_variable>
#include <cstdint>
+#include <mutex>
+#include <thread>
namespace llvm::omp::target {
namespace plugin {
@@ -37,6 +41,9 @@ struct RPCServerTy {
/// Initializes the handles to the number of devices we may need to service.
RPCServerTy(plugin::GenericPluginTy &Plugin);
+ /// Deinitialize the associated memory and resources.
+ llvm::Error shutDown();
+
/// Check if this device image is using an RPC server. This checks for the
/// precense of an externally visible symbol in the device image that will
/// be present whenever RPC code is called.
@@ -51,17 +58,59 @@ struct RPCServerTy {
plugin::GenericGlobalHandlerTy &Handler,
plugin::DeviceImageTy &Image);
- /// Runs the RPC server associated with the \p Device until the pending work
- /// is cleared.
- llvm::Error runServer(plugin::GenericDeviceTy &Device);
-
/// Deinitialize the RPC server for the given device. This will free the
/// memory associated with the k
llvm::Error deinitDevice(plugin::GenericDeviceTy &Device);
private:
/// Array from this device's identifier to its attached devices.
- llvm::SmallVector<uintptr_t> Handles;
+ std::unique_ptr<std::atomic<uintptr_t>[]> Handles;
+
+ /// A helper class for running the user thread that handles
+ struct ServerThread {
+ std::thread Worker;
+
+ /// A boolean indicating whether or not the worker thread should continue.
+ std::atomic<bool> Running;
+
+ /// The number of currently executing kernels across all devices that need
+ /// the server thread to be running.
+ std::atomic<uint32_t> NumUsers;
+
+ /// The condition variable used to suspend the thread if no work is needed.
+ std::condition_variable CV;
+ std::mutex Mutex;
+
+ /// A reference to all the RPC interfaces that the server is handling.
+ llvm::ArrayRef<std::atomic<uintptr_t>> Handles;
+
+ /// Initialize the worker thread to run in the background.
+ ServerThread(std::atomic<uintptr_t> Handles[], size_t Length);
+ ~ServerThread() { assert(!Running && "Thread not shut down explicitly\n"); }
+
+ /// Notify the worker thread that there is a user that needs it.
+ void notify() {
+ NumUsers.fetch_add(1, std::memory_order_seq_cst);
+ CV.notify_all();
+ }
+
+ /// Indicate that one of the dependent users has finished.
+ void finish() {
+ NumUsers.fetch_sub(1, std::memory_order_seq_cst);
+ CV.notify_all();
+ }
+
+ /// Destroy the worker thread and wait.
+ void shutDown();
+
+ /// Run the server thread to continuously check the RPC interface for work
+ /// to be done for the device.
+ void run();
+ };
+
+public:
+ /// Pointer to the server thread instance.
+ std::unique_ptr<ServerThread> Thread;
};
} // namespace llvm::omp::target
diff --git a/offload/plugins-nextgen/common/src/PluginInterface.cpp b/offload/plugins-nextgen/common/src/PluginInterface.cpp
index 25b815b7f96694..2be0fc0a713da5 100644
--- a/offload/plugins-nextgen/common/src/PluginInterface.cpp
+++ b/offload/plugins-nextgen/common/src/PluginInterface.cpp
@@ -1624,8 +1624,11 @@ Error GenericPluginTy::deinit() {
if (GlobalHandler)
delete GlobalHandler;
- if (RPCServer)
+ if (RPCServer) {
+ if (Error Err = RPCServer->shutDown())
+ return Err;
delete RPCServer;
+ }
if (RecordReplay)
delete RecordReplay;
diff --git a/offload/plugins-nextgen/common/src/RPC.cpp b/offload/plugins-nextgen/common/src/RPC.cpp
index faa2cbd4f02fe1..d47a869331a80d 100644
--- a/offload/plugins-nextgen/common/src/RPC.cpp
+++ b/offload/plugins-nextgen/common/src/RPC.cpp
@@ -21,8 +21,60 @@ using namespace llvm;
using namespace omp;
using namespace target;
+void RPCServerTy::ServerThread::shutDown() {
+#ifdef LIBOMPTARGET_RPC_SUPPORT
+ Running.store(false, std::memory_order_release);
+ CV.notify_all();
+ if (Worker.joinable())
+ Worker.join();
+#endif
+}
+
+void RPCServerTy::ServerThread::run() {
+#ifdef LIBOMPTARGET_RPC_SUPPORT
+ while (Running.load(std::memory_order_acquire)) {
+ std::unique_lock<decltype(Mutex)> Lock(Mutex);
+ CV.wait(Lock, [&]() {
+ return NumUsers.load(std::memory_order_relaxed) ||
+ !Running.load(std::memory_order_relaxed);
+ });
+
+ while (NumUsers.load(std::memory_order_relaxed) &&
+ Running.load(std::memory_order_relaxed)) {
+ for (const auto &Handle : Handles) {
+ rpc_device_t RPCDevice{Handle};
+ [[maybe_unused]] rpc_status_t Err = rpc_handle_server(RPCDevice);
+ assert(Err == RPC_STATUS_SUCCESS &&
+ "Checking the RPC server should not fail");
+ }
+ }
+ }
+#endif
+}
+
+#ifdef LIBOMPTARGET_RPC_SUPPORT
+RPCServerTy::ServerThread::ServerThread(std::atomic<uintptr_t> Handles[],
+ size_t Length)
+ : Worker(std::thread([this]() { run(); })), Running(true), NumUsers(0),
+ CV(), Mutex(), Handles(Handles, Length) {}
+#else
+RPCServerTy::ServerThread::ServerThread(std::atomic<uintptr_t> Handles[],
+ size_t Length)
+ : Worker(), Running(true), NumUsers(0), CV(), Mutex(),
+ Handles(Handles, Length) {}
+#endif
+
RPCServerTy::RPCServerTy(plugin::GenericPluginTy &Plugin)
- : Handles(Plugin.getNumDevices()) {}
+ : Handles(
+ std::make_unique<std::atomic<uintptr_t>[]>(Plugin.getNumDevices())),
+ Thread(new ServerThread(Handles.get(), Plugin.getNumDevices())) {}
+
+llvm::Error RPCServerTy::shutDown() {
+#ifdef LIBOMPTARGET_RPC_SUPPORT
+ Thread->shutDown();
+ return Error::success();
+#endif
+}
llvm::Expected<bool>
RPCServerTy::isDeviceUsingRPC(plugin::GenericDeviceTy &Device,
@@ -109,17 +161,6 @@ Error RPCServerTy::initDevice(plugin::GenericDeviceTy &Device,
return Error::success();
}
-Error RPCServerTy::runServer(plugin::GenericDeviceTy &Device) {
-#ifdef LIBOMPTARGET_RPC_SUPPORT
- rpc_device_t RPCDevice{Handles[Device.getDeviceId()]};
- if (rpc_status_t Err = rpc_handle_server(RPCDevice))
- return plugin::Plugin::error(
- "Error while running RPC server on device %d: %d", Device.getDeviceId(),
- Err);
-#endif
- return Error::success();
-}
-
Error RPCServerTy::deinitDevice(plugin::GenericDeviceTy &Device) {
#ifdef LIBOMPTARGET_RPC_SUPPORT
rpc_device_t RPCDevice{Handles[Device.getDeviceId()]};
diff --git a/offload/plugins-nextgen/cuda/dynamic_cuda/cuda.cpp b/offload/plugins-nextgen/cuda/dynamic_cuda/cuda.cpp
index 5ec3adb9e4e3a1..7878499dbfcb7e 100644
--- a/offload/plugins-nextgen/cuda/dynamic_cuda/cuda.cpp
+++ b/offload/plugins-nextgen/cuda/dynamic_cuda/cuda.cpp
@@ -63,6 +63,7 @@ DLWRAP(cuStreamCreate, 2)
DLWRAP(cuStreamDestroy, 1)
DLWRAP(cuStreamSynchronize, 1)
DLWRAP(cuStreamQuery, 1)
+DLWRAP(cuStreamAddCallback, 4)
DLWRAP(cuCtxSetCurrent, 1)
DLWRAP(cuDevicePrimaryCtxRelease, 1)
DLWRAP(cuDevicePrimaryCtxGetState, 3)
diff --git a/offload/plugins-nextgen/cuda/dynamic_cuda/cuda.h b/offload/plugins-nextgen/cuda/dynamic_cuda/cuda.h
index 16c8f7ad46c445..ad874735a25ed9 100644
--- a/offload/plugins-nextgen/cuda/dynamic_cuda/cuda.h
+++ b/offload/plugins-nextgen/cuda/dynamic_cuda/cuda.h
@@ -286,6 +286,8 @@ static inline void *CU_LAUNCH_PARAM_END = (void *)0x00;
static inline void *CU_LAUNCH_PARAM_BUFFER_POINTER = (void *)0x01;
static inline void *CU_LAUNCH_PARAM_BUFFER_SIZE = (void *)0x02;
+typedef void (*CUstreamCallback)(CUstream, CUresult, void *);
+
CUresult cuCtxGetDevice(CUdevice *);
CUresult cuDeviceGet(CUdevice *, int);
CUresult cuDeviceGetAttribute(int *, CUdevice_attribute, CUdevice);
@@ -326,6 +328,7 @@ CUresult cuStreamCreate(CUstream *, unsigned);
CUresult cuStreamDestroy(CUstream);
CUresult cuStreamSynchronize(CUstream);
CUresult cuStreamQuery(CUstream);
+CUresult cuStreamAddCallback(CUstream, CUstreamCallback, void *, unsigned int);
CUresult cuCtxSetCurrent(CUcontext);
CUresult cuDevicePrimaryCtxRelease(CUdevice);
CUresult cuDevicePrimaryCtxGetState(CUdevice, unsigned *, int *);
diff --git a/offload/plugins-nextgen/cuda/src/rtl.cpp b/offload/plugins-nextgen/cuda/src/rtl.cpp
index 015c7775ba3513..7c876c603aa46c 100644
--- a/offload/plugins-nextgen/cuda/src/rtl.cpp
+++ b/offload/plugins-nextgen/cuda/src/rtl.cpp
@@ -632,15 +632,7 @@ struct CUDADeviceTy : public GenericDeviceTy {
CUresult Res;
// If we have an RPC server running on this device we will continuously
// query it for work rather than blocking.
- if (!getRPCServer()) {
- Res = cuStreamSynchronize(Stream);
- } else {
- do {
- Res = cuStreamQuery(Stream);
- if (auto Err = getRPCServer()->runServer(*this))
- return Err;
- } while (Res == CUDA_ERROR_NOT_READY);
- }
+ Res = cuStreamSynchronize(Stream);
// Once the stream is synchronized, return it to stream pool and reset
// AsyncInfo. This is to make sure the synchronization only works for its
@@ -825,17 +817,6 @@ struct CUDADeviceTy : public GenericDeviceTy {
if (auto Err = getStream(AsyncInfoWrapper, Stream))
return Err;
- // If there is already pending work on the stream it could be waiting for
- // someone to check the RPC server.
- if (auto *RPCServer = getRPCServer()) {
- CUresult Res = cuStreamQuery(Stream);
- while (Res == CUDA_ERROR_NOT_READY) {
- if (auto Err = RPCServer->runServer(*this))
- return Err;
- Res = cuStreamQuery(Stream);
- }
- }
-
CUresult Res = cuMemcpyDtoHAsync(HstPtr, (CUdeviceptr)TgtPtr, Size, Stream);
return Plugin::check(Res, "Error in cuMemcpyDtoHAsync: %s");
}
@@ -1294,10 +1275,26 @@ Error CUDAKernelTy::launchImpl(GenericDeviceTy &GenericDevice,
reinterpret_cast<void *>(&LaunchParams.Size),
CU_LAUNCH_PARAM_END};
+ // If we are running an RPC server we want to wake up the server thread
+ // whenever there is a kernel running and let it sleep otherwise.
+ if (GenericDevice.getRPCServer())
+ GenericDevice.Plugin.getRPCServer().Thread->notify();
+
CUresult Res = cuLaunchKernel(Func, NumBlocks, /*gridDimY=*/1,
...
[truncated]
|
402c7dc
to
812357f
Compare
812357f
to
d103317
Compare
I thought we just take advantage of the fact that when we finish launching the kernel, the thread (no matter whether it is helper thread or regular thread), it is waiting there anyway? |
return Err; | ||
} else { | ||
return Plugin::error("Unknown action function!"); | ||
for (auto [Callback, ActionArg] : llvm::zip(Callbacks, ActionArgs)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert same size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in #112785, I'll update this once that's merged.
return Plugin::error("Unknown action function!"); | ||
for (auto [Callback, ActionArg] : llvm::zip(Callbacks, ActionArgs)) { | ||
// Perform the action. | ||
if (Callback == memcpyAction) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd do a switch and unreachable on default. Easier to extend it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from #112785, I can modify it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you can't do a switch on function pointers, I'd need to convert it to uintptr_t but that's really ugly. The default case can't be unreachable because if it's not one of the known ones it's an unknown function pointer, though I could check for nullI guess?
AFAIK, doing this is always unsound.
With the current implementation, you can use |
70c49e4
to
008550a
Compare
Okay after thinking about it in my brain and running it for over an hour I'm reasonably confident it won't deadlock. |
IIUC we check if |
The CUDA Driver API basically says that you cannot rely on the call to With the current logic, if you do this it will deadlock forever. #include <stdio.h>
int main() {
#pragma omp target
puts("Hello");
} $ clang input.c -fopenmp --offload-arch=sm_89
$ env CUDA_LAUNCH_BLOCKING=1 ./a.out
... With this patch it works fine. If you do an async launch it will work, but only because the actual waiting is done by a helper thread. It's not something we can rely on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating extra thread can be tricky because there are cases that people don't like extra thread created. For example, QMCPack disables hidden helper thread due to this reason. Some search online shows that CUDA_LAUNCH_BLOCKING
is actually a debug feature. IMHO we can just claim that this is not supported (TBH we don't have to support things prefix with either CUDA
or HIP
), but users can still choose to use at their discretion.
|
||
// Register a callback to indicate when the kernel is complete. | ||
if (GenericDevice.getRPCServer()) | ||
cuStreamAddCallback( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm personally not a fan with this function, because it is being deprecated. I generally don't think it is a good idea to add new code relying on features that become deprecated soon. Not sure what CUDA's next-gen API for this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't know about that, supposedly they have cuLaunchHostFunc
which just pushes the function onto the stream. It appears the only difference is that the function will not be handled on stream error. I wonder what the proper way to handle the errors are then.
It's a debug feature, but AFAIK it's still against the programming model because they expect things like that to be possible. So we're basically relying on implicit behavior that's not guaranteed by their documentation. Even without that, this still doesn't work in an async scenario without helper threads. I'm also not a fan of user threads, which is why I elected to make this one go to sleep when it's not needed so it's at least not wasting resources. I think right now this will always create the thread, since I figured it's fine if it just goes to sleep immediately and is never woken up by a kernel launch that needs an RPC server, but I could make it only do that if we need the RPC server at all, then it's a pay for what you use scenario. |
0434bf3
to
a4df002
Compare
I think that is much more preferable if we really want to use extra thread to do the work. |
Ping |
7296d9c
to
72bf3b0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but I'd defer the initialization of RPC stuff to the first kernel that uses it instead of at initialization stage. In the worst case, it can have a kernel that depends on RPC but doesn't actually launch it.
3db5169
to
6bad32a
Compare
@ronlieb @jplehr Downstream version https://gist.github.com/jhuber6/f0ccbe5357cab5553e873c124d370ff0 |
Bah, causes issues with NVIDIA's malloc somehow. I really need to replace that with |
6bad32a
to
41facc9
Compare
41facc9
to
bb39b6b
Compare
Summary: Handling the RPC server requires running through list of jobs that the device has requested to be done. Currently this is handled by the thread that does the waiting for the kernel to finish. However, this is not sound on NVIDIA architectures and only works for async launches in the OpenMP model that uses helper threads. However, we also don't want to have this thread doing work unnnecessarily. For this reason we track the execution of kernels and cause the thread to sleep via a condition variable (usually backed by some kind of futex or other intelligent sleeping mechanism) so that the thread will be idle while no kernels are running. Use cuLaunchHostFunc Only create thread if used
bb39b6b
to
5156c2d
Compare
lands and reverts 134401d [Offload] Move RPC server handling to a dedicated thread (llvm#112988) bd8a818 [Offload] Add cuLaunchHostFunc to dynamic cuda
Summary: Handling the RPC server requires running through list of jobs that the device has requested to be done. Currently this is handled by the thread that does the waiting for the kernel to finish. However, this is not sound on NVIDIA architectures and only works for async launches in the OpenMP model that uses helper threads. However, we also don't want to have this thread doing work unnnecessarily. For this reason we track the execution of kernels and cause the thread to sleep via a condition variable (usually backed by some kind of futex or other intelligent sleeping mechanism) so that the thread will be idle while no kernels are running.
…erm/restore-move-rpc [Offload] Move RPC server handling to a dedicated thread (llvm#112988)
Summary:
Handling the RPC server requires running through list of jobs that the
device has requested to be done. Currently this is handled by the thread
that does the waiting for the kernel to finish. However, this is not
sound on NVIDIA architectures and only works for async launches in the
OpenMP model that uses helper threads.
However, we also don't want to have this thread doing work
unnnecessarily. For this reason we track the execution of kernels and
cause the thread to sleep via a condition variable (usually backed by
some kind of futex or other intelligent sleeping mechanism) so that the
thread will be idle while no kernels are running.