Skip to content

Commit e7492fb

Browse files
authored
[SYCL] Don't block execution when flushing a stream (#2581)
Currently stream flush is a blocking operation. This is incorrect, there are cases when kernels must be executed in parallel, for example, when 2 kernels are connected with FPGA pipe. This commit implements stream flush as a non-blocking operation by means of host_tasks.
1 parent 2e9542a commit e7492fb

File tree

13 files changed

+277
-65
lines changed

13 files changed

+277
-65
lines changed

sycl/include/CL/sycl/detail/cg.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ class CGExecKernel : public CG {
155155
vector_class<shared_ptr_class<detail::stream_impl>> getStreams() const {
156156
return MStreams;
157157
}
158+
void clearStreams() { MStreams.clear(); }
158159
};
159160

160161
/// "Copy memory" command group class.

sycl/source/detail/scheduler/commands.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1202,7 +1202,14 @@ AllocaCommandBase *ExecCGCommand::getAllocaForReq(Requirement *Req) {
12021202
}
12031203

12041204
vector_class<StreamImplPtr> ExecCGCommand::getStreams() const {
1205-
return ((CGExecKernel *)MCommandGroup.get())->getStreams();
1205+
if (MCommandGroup->getType() == CG::KERNEL)
1206+
return ((CGExecKernel *)MCommandGroup.get())->getStreams();
1207+
return {};
1208+
}
1209+
1210+
void ExecCGCommand::clearStreams() {
1211+
if (MCommandGroup->getType() == CG::KERNEL)
1212+
((CGExecKernel *)MCommandGroup.get())->clearStreams();
12061213
}
12071214

12081215
cl_int UpdateHostRequirementCommand::enqueueImp() {

sycl/source/detail/scheduler/commands.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,8 @@ class ExecCGCommand : public Command {
492492

493493
vector_class<StreamImplPtr> getStreams() const;
494494

495+
void clearStreams();
496+
495497
void printDot(std::ostream &Stream) const final override;
496498
void emitInstrumentationData() final override;
497499

sycl/source/detail/scheduler/graph_builder.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,9 @@ void Scheduler::GraphBuilder::decrementLeafCountersForRecord(
823823
}
824824
}
825825

826-
void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
826+
void Scheduler::GraphBuilder::cleanupCommandsForRecord(
827+
MemObjRecord *Record,
828+
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
827829
std::vector<AllocaCommandBase *> &AllocaCommands = Record->MAllocaCommands;
828830
if (AllocaCommands.empty())
829831
return;
@@ -872,6 +874,16 @@ void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
872874
if (!markNodeAsVisited(Cmd, MVisitedCmds))
873875
continue;
874876

877+
// Collect stream objects for a visited command.
878+
if (Cmd->getType() == Command::CommandType::RUN_CG) {
879+
auto ExecCmd = static_cast<ExecCGCommand *>(Cmd);
880+
std::vector<std::shared_ptr<stream_impl>> Streams =
881+
std::move(ExecCmd->getStreams());
882+
ExecCmd->clearStreams();
883+
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
884+
Streams.end());
885+
}
886+
875887
for (Command *UserCmd : Cmd->MUsers)
876888
if (UserCmd->getType() != Command::CommandType::ALLOCA)
877889
MCmdsToVisit.push(UserCmd);
@@ -909,7 +921,9 @@ void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
909921
handleVisitedNodes(MVisitedCmds);
910922
}
911923

912-
void Scheduler::GraphBuilder::cleanupFinishedCommands(Command *FinishedCmd) {
924+
void Scheduler::GraphBuilder::cleanupFinishedCommands(
925+
Command *FinishedCmd,
926+
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
913927
assert(MCmdsToVisit.empty());
914928
MCmdsToVisit.push(FinishedCmd);
915929
MVisitedCmds.clear();
@@ -922,6 +936,16 @@ void Scheduler::GraphBuilder::cleanupFinishedCommands(Command *FinishedCmd) {
922936
if (!markNodeAsVisited(Cmd, MVisitedCmds))
923937
continue;
924938

939+
// Collect stream objects for a visited command.
940+
if (Cmd->getType() == Command::CommandType::RUN_CG) {
941+
auto ExecCmd = static_cast<ExecCGCommand *>(Cmd);
942+
std::vector<std::shared_ptr<stream_impl>> Streams =
943+
std::move(ExecCmd->getStreams());
944+
ExecCmd->clearStreams();
945+
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
946+
Streams.end());
947+
}
948+
925949
for (const DepDesc &Dep : Cmd->MDeps) {
926950
if (Dep.MDepCommand)
927951
MCmdsToVisit.push(Dep.MDepCommand);

sycl/source/detail/scheduler/scheduler.cpp

Lines changed: 79 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <detail/stream_impl.hpp>
1515

1616
#include <chrono>
17+
#include <cstdio>
1718
#include <memory>
1819
#include <mutex>
1920
#include <set>
@@ -152,48 +153,75 @@ void Scheduler::waitForEvent(EventImplPtr Event) {
152153
GraphProcessor::waitForEvent(std::move(Event));
153154
}
154155

156+
static void deallocateStreams(
157+
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
158+
// Deallocate buffers for stream objects of the finished commands. Iterate in
159+
// reverse order because it is the order of commands execution.
160+
for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
161+
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
162+
detail::Scheduler::getInstance().deallocateStreamBuffers(
163+
StreamImplPtr->get());
164+
}
165+
155166
void Scheduler::cleanupFinishedCommands(EventImplPtr FinishedEvent) {
156-
// Avoiding deadlock situation, where one thread is in the process of
157-
// enqueueing (with a locked mutex) a currently blocked task that waits for
158-
// another thread which is stuck at attempting cleanup.
159-
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::try_to_lock);
160-
if (Lock.owns_lock()) {
161-
Command *FinishedCmd = static_cast<Command *>(FinishedEvent->getCommand());
162-
// The command might have been cleaned up (and set to nullptr) by another
163-
// thread
164-
if (FinishedCmd)
165-
MGraphBuilder.cleanupFinishedCommands(FinishedCmd);
167+
// We are going to traverse a graph of finished commands. Gather stream
168+
// objects from these commands if any and deallocate buffers for these stream
169+
// objects, this is needed to guarantee that streamed data is printed and
170+
// resources are released.
171+
std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
172+
{
173+
// Avoiding deadlock situation, where one thread is in the process of
174+
// enqueueing (with a locked mutex) a currently blocked task that waits for
175+
// another thread which is stuck at attempting cleanup.
176+
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock,
177+
std::try_to_lock);
178+
if (Lock.owns_lock()) {
179+
auto FinishedCmd = static_cast<Command *>(FinishedEvent->getCommand());
180+
// The command might have been cleaned up (and set to nullptr) by another
181+
// thread
182+
if (FinishedCmd)
183+
MGraphBuilder.cleanupFinishedCommands(FinishedCmd, StreamsToDeallocate);
184+
}
166185
}
186+
deallocateStreams(StreamsToDeallocate);
167187
}
168188

169189
void Scheduler::removeMemoryObject(detail::SYCLMemObjI *MemObj) {
170-
MemObjRecord *Record = nullptr;
171-
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::defer_lock);
172-
190+
// We are going to traverse a graph of finished commands. Gather stream
191+
// objects from these commands if any and deallocate buffers for these stream
192+
// objects, this is needed to guarantee that streamed data is printed and
193+
// resources are released.
194+
std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
173195
{
174-
lockSharedTimedMutex(Lock);
196+
MemObjRecord *Record = nullptr;
197+
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::defer_lock);
175198

176-
Record = MGraphBuilder.getMemObjRecord(MemObj);
177-
if (!Record)
178-
// No operations were performed on the mem object
179-
return;
199+
{
200+
lockSharedTimedMutex(Lock);
180201

181-
Lock.unlock();
182-
}
202+
Record = MGraphBuilder.getMemObjRecord(MemObj);
203+
if (!Record)
204+
// No operations were performed on the mem object
205+
return;
183206

184-
{
185-
// This only needs a shared mutex as it only involves enqueueing and
186-
// awaiting for events
187-
std::shared_lock<std::shared_timed_mutex> Lock(MGraphLock);
188-
waitForRecordToFinish(Record);
189-
}
207+
Lock.unlock();
208+
}
190209

191-
{
192-
lockSharedTimedMutex(Lock);
193-
MGraphBuilder.decrementLeafCountersForRecord(Record);
194-
MGraphBuilder.cleanupCommandsForRecord(Record);
195-
MGraphBuilder.removeRecordForMemObj(MemObj);
210+
{
211+
// This only needs a shared mutex as it only involves enqueueing and
212+
// awaiting for events
213+
std::shared_lock<std::shared_timed_mutex> Lock(MGraphLock);
214+
waitForRecordToFinish(Record);
215+
}
216+
217+
{
218+
lockSharedTimedMutex(Lock);
219+
MGraphBuilder.decrementLeafCountersForRecord(Record);
220+
MGraphBuilder.cleanupCommandsForRecord(Record, StreamsToDeallocate);
221+
MGraphBuilder.removeRecordForMemObj(MemObj);
222+
}
196223
}
224+
deallocateStreams(StreamsToDeallocate);
197225
}
198226

199227
EventImplPtr Scheduler::addHostAccessor(Requirement *Req) {
@@ -243,11 +271,12 @@ void Scheduler::allocateStreamBuffers(stream_impl *Impl,
243271
size_t FlushBufferSize) {
244272
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
245273
StreamBuffersPool.insert(
246-
{Impl, StreamBuffers(StreamBufferSize, FlushBufferSize)});
274+
{Impl, new StreamBuffers(StreamBufferSize, FlushBufferSize)});
247275
}
248276

249277
void Scheduler::deallocateStreamBuffers(stream_impl *Impl) {
250278
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
279+
delete StreamBuffersPool[Impl];
251280
StreamBuffersPool.erase(Impl);
252281
}
253282

@@ -258,6 +287,24 @@ Scheduler::Scheduler() {
258287
/*PropList=*/{}));
259288
}
260289

290+
Scheduler::~Scheduler() {
291+
// By specification there are several possible sync points: buffer
292+
// destruction, wait() method of a queue or event. Stream doesn't introduce
293+
// any synchronization point. It is guaranteed that stream is flushed and
294+
// resources are released only if one of the listed sync points was used for
295+
// the kernel. Otherwise resources for stream will not be released, issue a
296+
// warning in this case.
297+
if (pi::trace(pi::TraceLevel::PI_TRACE_BASIC)) {
298+
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
299+
if (!StreamBuffersPool.empty())
300+
fprintf(
301+
stderr,
302+
"\nWARNING: Some commands may have not finished the execution and "
303+
"not all resources were released. Please be sure that all kernels "
304+
"have synchronization points.\n\n");
305+
}
306+
}
307+
261308
void Scheduler::lockSharedTimedMutex(
262309
std::unique_lock<std::shared_timed_mutex> &Lock) {
263310
#ifdef _WIN32

sycl/source/detail/scheduler/scheduler.hpp

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <queue>
1919
#include <set>
2020
#include <shared_mutex>
21+
#include <unordered_map>
2122
#include <unordered_set>
2223
#include <vector>
2324

@@ -429,11 +430,24 @@ class Scheduler {
429430
/// \return a vector of "immediate" dependencies for the Event given.
430431
std::vector<EventImplPtr> getWaitList(EventImplPtr Event);
431432

433+
/// Allocate buffers in the pool for a provided stream
434+
///
435+
/// \param Pointer to the stream object
436+
/// \param Size of the stream buffer
437+
/// \param Size of the flush buffer for a single work item
438+
void allocateStreamBuffers(stream_impl *, size_t, size_t);
439+
440+
/// Deallocate all stream buffers in the pool
441+
///
442+
/// \param Pointer to the stream object
443+
void deallocateStreamBuffers(stream_impl *);
444+
432445
QueueImplPtr getDefaultHostQueue() { return DefaultHostQueue; }
433446

434447
static MemObjRecord *getMemObjRecord(const Requirement *const Req);
435448

436449
Scheduler();
450+
~Scheduler();
437451

438452
protected:
439453
/// Provides exclusive access to std::shared_timed_mutex object with deadlock
@@ -489,7 +503,9 @@ class Scheduler {
489503

490504
/// Removes finished non-leaf non-alloca commands from the subgraph
491505
/// (assuming that all its commands have been waited for).
492-
void cleanupFinishedCommands(Command *FinishedCmd);
506+
void cleanupFinishedCommands(
507+
Command *FinishedCmd,
508+
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &);
493509

494510
/// Reschedules the command passed using Queue provided.
495511
///
@@ -512,7 +528,9 @@ class Scheduler {
512528
void decrementLeafCountersForRecord(MemObjRecord *Record);
513529

514530
/// Removes commands that use the given MemObjRecord from the graph.
515-
void cleanupCommandsForRecord(MemObjRecord *Record);
531+
void cleanupCommandsForRecord(
532+
MemObjRecord *Record,
533+
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &);
516534

517535
/// Removes the MemObjRecord for the memory object passed.
518536
void removeRecordForMemObj(SYCLMemObjI *MemObject);
@@ -727,7 +745,12 @@ class Scheduler {
727745
: Data(StreamBufferSize, 0),
728746
Buf(Data.data(), range<1>(StreamBufferSize),
729747
{property::buffer::use_host_ptr()}),
730-
FlushBuf(range<1>(FlushBufferSize)) {}
748+
FlushBuf(range<1>(FlushBufferSize)) {
749+
// Disable copy back on buffer destruction. Copy is scheduled as a host
750+
// task which fires up as soon as kernel has completed exectuion.
751+
Buf.set_write_back(false);
752+
FlushBuf.set_write_back(false);
753+
}
731754

732755
// Vector on the host side which is used to initialize the stream
733756
// buffer
@@ -744,13 +767,16 @@ class Scheduler {
744767

745768
// Protects stream buffers pool
746769
std::mutex StreamBuffersPoolMutex;
747-
std::map<stream_impl *, StreamBuffers> StreamBuffersPool;
748770

749-
/// Allocate buffers in the pool for a provided stream
750-
void allocateStreamBuffers(stream_impl *, size_t, size_t);
751-
752-
/// Deallocate buffers in the pool for a provided stream
753-
void deallocateStreamBuffers(stream_impl *);
771+
// We need to store a pointer to the structure with stream buffers because we
772+
// want to avoid a situation when buffers are destructed during destruction of
773+
// the scheduler. Scheduler is a global object and it can be destructed after
774+
// all device runtimes are unloaded. Destruction of the buffers at this stage
775+
// will lead to a faliure. In the correct program there will be sync points
776+
// for all kernels and all allocated resources will be released by the
777+
// scheduler. If program is not correct and doesn't have necessary sync point
778+
// then warning will be issued.
779+
std::unordered_map<stream_impl *, StreamBuffers *> StreamBuffersPool;
754780
};
755781

756782
} // namespace detail

0 commit comments

Comments
 (0)