Skip to content

[SYCL] Don't block execution when flushing a stream #2581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sycl/include/CL/sycl/detail/cg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class CGExecKernel : public CG {
vector_class<shared_ptr_class<detail::stream_impl>> getStreams() const {
return MStreams;
}
void clearStreams() { MStreams.clear(); }
};

/// "Copy memory" command group class.
Expand Down
9 changes: 8 additions & 1 deletion sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,14 @@ AllocaCommandBase *ExecCGCommand::getAllocaForReq(Requirement *Req) {
}

vector_class<StreamImplPtr> ExecCGCommand::getStreams() const {
return ((CGExecKernel *)MCommandGroup.get())->getStreams();
if (MCommandGroup->getType() == CG::KERNEL)
return ((CGExecKernel *)MCommandGroup.get())->getStreams();
return {};
}

void ExecCGCommand::clearStreams() {
if (MCommandGroup->getType() == CG::KERNEL)
((CGExecKernel *)MCommandGroup.get())->clearStreams();
}

cl_int UpdateHostRequirementCommand::enqueueImp() {
Expand Down
2 changes: 2 additions & 0 deletions sycl/source/detail/scheduler/commands.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ class ExecCGCommand : public Command {

vector_class<StreamImplPtr> getStreams() const;

void clearStreams();

void printDot(std::ostream &Stream) const final override;
void emitInstrumentationData() final override;

Expand Down
28 changes: 26 additions & 2 deletions sycl/source/detail/scheduler/graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,9 @@ void Scheduler::GraphBuilder::decrementLeafCountersForRecord(
}
}

void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
void Scheduler::GraphBuilder::cleanupCommandsForRecord(
MemObjRecord *Record,
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
std::vector<AllocaCommandBase *> &AllocaCommands = Record->MAllocaCommands;
if (AllocaCommands.empty())
return;
Expand Down Expand Up @@ -872,6 +874,16 @@ void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
if (!markNodeAsVisited(Cmd, MVisitedCmds))
continue;

// Collect stream objects for a visited command.
if (Cmd->getType() == Command::CommandType::RUN_CG) {
auto ExecCmd = static_cast<ExecCGCommand *>(Cmd);
std::vector<std::shared_ptr<stream_impl>> Streams =
std::move(ExecCmd->getStreams());
ExecCmd->clearStreams();
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
Streams.end());
}

for (Command *UserCmd : Cmd->MUsers)
if (UserCmd->getType() != Command::CommandType::ALLOCA)
MCmdsToVisit.push(UserCmd);
Expand Down Expand Up @@ -909,7 +921,9 @@ void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
handleVisitedNodes(MVisitedCmds);
}

void Scheduler::GraphBuilder::cleanupFinishedCommands(Command *FinishedCmd) {
void Scheduler::GraphBuilder::cleanupFinishedCommands(
Command *FinishedCmd,
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
assert(MCmdsToVisit.empty());
MCmdsToVisit.push(FinishedCmd);
MVisitedCmds.clear();
Expand All @@ -922,6 +936,16 @@ void Scheduler::GraphBuilder::cleanupFinishedCommands(Command *FinishedCmd) {
if (!markNodeAsVisited(Cmd, MVisitedCmds))
continue;

// Collect stream objects for a visited command.
if (Cmd->getType() == Command::CommandType::RUN_CG) {
auto ExecCmd = static_cast<ExecCGCommand *>(Cmd);
std::vector<std::shared_ptr<stream_impl>> Streams =
std::move(ExecCmd->getStreams());
ExecCmd->clearStreams();
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
Streams.end());
}

for (const DepDesc &Dep : Cmd->MDeps) {
if (Dep.MDepCommand)
MCmdsToVisit.push(Dep.MDepCommand);
Expand Down
111 changes: 79 additions & 32 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <detail/stream_impl.hpp>

#include <chrono>
#include <cstdio>
#include <memory>
#include <mutex>
#include <set>
Expand Down Expand Up @@ -152,48 +153,75 @@ void Scheduler::waitForEvent(EventImplPtr Event) {
GraphProcessor::waitForEvent(std::move(Event));
}

static void deallocateStreams(
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

void Scheduler::cleanupFinishedCommands(EventImplPtr FinishedEvent) {
// Avoiding deadlock situation, where one thread is in the process of
// enqueueing (with a locked mutex) a currently blocked task that waits for
// another thread which is stuck at attempting cleanup.
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::try_to_lock);
if (Lock.owns_lock()) {
Command *FinishedCmd = static_cast<Command *>(FinishedEvent->getCommand());
// The command might have been cleaned up (and set to nullptr) by another
// thread
if (FinishedCmd)
MGraphBuilder.cleanupFinishedCommands(FinishedCmd);
// We are going to traverse a graph of finished commands. Gather stream
// objects from these commands if any and deallocate buffers for these stream
// objects, this is needed to guarantee that streamed data is printed and
// resources are released.
std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
{
// Avoiding deadlock situation, where one thread is in the process of
// enqueueing (with a locked mutex) a currently blocked task that waits for
// another thread which is stuck at attempting cleanup.
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock,
std::try_to_lock);
if (Lock.owns_lock()) {
auto FinishedCmd = static_cast<Command *>(FinishedEvent->getCommand());
// The command might have been cleaned up (and set to nullptr) by another
// thread
if (FinishedCmd)
MGraphBuilder.cleanupFinishedCommands(FinishedCmd, StreamsToDeallocate);
}
}
deallocateStreams(StreamsToDeallocate);
}

void Scheduler::removeMemoryObject(detail::SYCLMemObjI *MemObj) {
MemObjRecord *Record = nullptr;
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::defer_lock);

// We are going to traverse a graph of finished commands. Gather stream
// objects from these commands if any and deallocate buffers for these stream
// objects, this is needed to guarantee that streamed data is printed and
// resources are released.
std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
{
lockSharedTimedMutex(Lock);
MemObjRecord *Record = nullptr;
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::defer_lock);

Record = MGraphBuilder.getMemObjRecord(MemObj);
if (!Record)
// No operations were performed on the mem object
return;
{
lockSharedTimedMutex(Lock);

Lock.unlock();
}
Record = MGraphBuilder.getMemObjRecord(MemObj);
if (!Record)
// No operations were performed on the mem object
return;

{
// This only needs a shared mutex as it only involves enqueueing and
// awaiting for events
std::shared_lock<std::shared_timed_mutex> Lock(MGraphLock);
waitForRecordToFinish(Record);
}
Lock.unlock();
}

{
lockSharedTimedMutex(Lock);
MGraphBuilder.decrementLeafCountersForRecord(Record);
MGraphBuilder.cleanupCommandsForRecord(Record);
MGraphBuilder.removeRecordForMemObj(MemObj);
{
// This only needs a shared mutex as it only involves enqueueing and
// awaiting for events
std::shared_lock<std::shared_timed_mutex> Lock(MGraphLock);
waitForRecordToFinish(Record);
}

{
lockSharedTimedMutex(Lock);
MGraphBuilder.decrementLeafCountersForRecord(Record);
MGraphBuilder.cleanupCommandsForRecord(Record, StreamsToDeallocate);
MGraphBuilder.removeRecordForMemObj(MemObj);
}
}
deallocateStreams(StreamsToDeallocate);
}

EventImplPtr Scheduler::addHostAccessor(Requirement *Req) {
Expand Down Expand Up @@ -243,11 +271,12 @@ void Scheduler::allocateStreamBuffers(stream_impl *Impl,
size_t FlushBufferSize) {
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
StreamBuffersPool.insert(
{Impl, StreamBuffers(StreamBufferSize, FlushBufferSize)});
{Impl, new StreamBuffers(StreamBufferSize, FlushBufferSize)});
}

void Scheduler::deallocateStreamBuffers(stream_impl *Impl) {
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
delete StreamBuffersPool[Impl];
StreamBuffersPool.erase(Impl);
}

Expand All @@ -258,6 +287,24 @@ Scheduler::Scheduler() {
/*PropList=*/{}));
}

Scheduler::~Scheduler() {
// By specification there are several possible sync points: buffer
// destruction, wait() method of a queue or event. Stream doesn't introduce
// any synchronization point. It is guaranteed that stream is flushed and
// resources are released only if one of the listed sync points was used for
// the kernel. Otherwise resources for stream will not be released, issue a
// warning in this case.
if (pi::trace(pi::TraceLevel::PI_TRACE_BASIC)) {
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
if (!StreamBuffersPool.empty())
fprintf(
stderr,
"\nWARNING: Some commands may have not finished the execution and "
"not all resources were released. Please be sure that all kernels "
"have synchronization points.\n\n");
}
}

void Scheduler::lockSharedTimedMutex(
std::unique_lock<std::shared_timed_mutex> &Lock) {
#ifdef _WIN32
Expand Down
44 changes: 35 additions & 9 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <queue>
#include <set>
#include <shared_mutex>
#include <unordered_map>
#include <unordered_set>
#include <vector>

Expand Down Expand Up @@ -429,11 +430,24 @@ class Scheduler {
/// \return a vector of "immediate" dependencies for the Event given.
std::vector<EventImplPtr> getWaitList(EventImplPtr Event);

/// Allocate buffers in the pool for a provided stream
///
/// \param Pointer to the stream object
/// \param Size of the stream buffer
/// \param Size of the flush buffer for a single work item
void allocateStreamBuffers(stream_impl *, size_t, size_t);

/// Deallocate all stream buffers in the pool
///
/// \param Pointer to the stream object
void deallocateStreamBuffers(stream_impl *);

QueueImplPtr getDefaultHostQueue() { return DefaultHostQueue; }

static MemObjRecord *getMemObjRecord(const Requirement *const Req);

Scheduler();
~Scheduler();

protected:
/// Provides exclusive access to std::shared_timed_mutex object with deadlock
Expand Down Expand Up @@ -489,7 +503,9 @@ class Scheduler {

/// Removes finished non-leaf non-alloca commands from the subgraph
/// (assuming that all its commands have been waited for).
void cleanupFinishedCommands(Command *FinishedCmd);
void cleanupFinishedCommands(
Command *FinishedCmd,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &);

/// Reschedules the command passed using Queue provided.
///
Expand All @@ -512,7 +528,9 @@ class Scheduler {
void decrementLeafCountersForRecord(MemObjRecord *Record);

/// Removes commands that use the given MemObjRecord from the graph.
void cleanupCommandsForRecord(MemObjRecord *Record);
void cleanupCommandsForRecord(
MemObjRecord *Record,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &);

/// Removes the MemObjRecord for the memory object passed.
void removeRecordForMemObj(SYCLMemObjI *MemObject);
Expand Down Expand Up @@ -727,7 +745,12 @@ class Scheduler {
: Data(StreamBufferSize, 0),
Buf(Data.data(), range<1>(StreamBufferSize),
{property::buffer::use_host_ptr()}),
FlushBuf(range<1>(FlushBufferSize)) {}
FlushBuf(range<1>(FlushBufferSize)) {
// Disable copy back on buffer destruction. Copy is scheduled as a host
// task which fires up as soon as kernel has completed exectuion.
Buf.set_write_back(false);
FlushBuf.set_write_back(false);
}

// Vector on the host side which is used to initialize the stream
// buffer
Expand All @@ -744,13 +767,16 @@ class Scheduler {

// Protects stream buffers pool
std::mutex StreamBuffersPoolMutex;
std::map<stream_impl *, StreamBuffers> StreamBuffersPool;

/// Allocate buffers in the pool for a provided stream
void allocateStreamBuffers(stream_impl *, size_t, size_t);

/// Deallocate buffers in the pool for a provided stream
void deallocateStreamBuffers(stream_impl *);
// We need to store a pointer to the structure with stream buffers because we
// want to avoid a situation when buffers are destructed during destruction of
// the scheduler. Scheduler is a global object and it can be destructed after
// all device runtimes are unloaded. Destruction of the buffers at this stage
// will lead to a faliure. In the correct program there will be sync points
// for all kernels and all allocated resources will be released by the
// scheduler. If program is not correct and doesn't have necessary sync point
// then warning will be issued.
std::unordered_map<stream_impl *, StreamBuffers *> StreamBuffersPool;
};

} // namespace detail
Expand Down
Loading