Skip to content

Commit 1db0e81

Browse files
authored
[SYCL] Wait for stream service tasks (#7130)
Before the patch explicit waiting for the task which is expected to print a message using sycl::stream doesn't guarantee that the message is actually printed(the printing may happen later). The patch adds such a guarantee by registering events of stream service tasks in the user event produced for the original user task as well as in the queue where this task is submitted to. Then on explicit calls to queue::wait and event::wait we make sure that these additional events are complete as well.
1 parent c6091df commit 1db0e81

File tree

12 files changed

+205
-40
lines changed

12 files changed

+205
-40
lines changed

sycl/include/sycl/memory_enums.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ static constexpr std::memory_order getStdMemoryOrder(sycl::memory_order order) {
9595
case memory_order::seq_cst:
9696
return std::memory_order_seq_cst;
9797
}
98+
// Return default value here to avoid compiler warnings.
99+
// default case in switch doesn't help because some compiler warn about
100+
// having a default case while all values of enum are handled.
101+
return std::memory_order_acq_rel;
98102
}
99103
#endif // __SYCL_DEVICE_ONLY__
100104

sycl/source/detail/event_impl.cpp

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,22 @@ event_impl::~event_impl() {
6060

6161
void event_impl::waitInternal() {
6262
if (!MHostEvent && MEvent) {
63+
// Wait for the native event
6364
getPlugin().call<PiApiKind::piEventsWait>(1, &MEvent);
64-
return;
65-
}
66-
67-
if (MState == HES_Discarded)
65+
} else if (MState == HES_Discarded) {
66+
// Waiting for the discarded event is invalid
6867
throw sycl::exception(
6968
make_error_code(errc::invalid),
7069
"waitInternal method cannot be used for a discarded event.");
70+
} else if (MState != HES_Complete) {
71+
// Wait for the host event
72+
std::unique_lock<std::mutex> lock(MMutex);
73+
cv.wait(lock, [this] { return MState == HES_Complete; });
74+
}
7175

72-
if (MState == HES_Complete)
73-
return;
74-
75-
std::unique_lock<std::mutex> lock(MMutex);
76-
cv.wait(lock, [this] { return MState == HES_Complete; });
76+
// Wait for connected events(e.g. streams prints)
77+
for (const EventImplPtr &Event : MPostCompleteEvents)
78+
Event->wait(Event);
7779
}
7880

7981
void event_impl::setComplete() {
@@ -236,27 +238,10 @@ void event_impl::wait(std::shared_ptr<sycl::detail::event_impl> Self) {
236238

237239
void event_impl::wait_and_throw(
238240
std::shared_ptr<sycl::detail::event_impl> Self) {
239-
Scheduler &Sched = Scheduler::getInstance();
240-
241-
QueueImplPtr submittedQueue = nullptr;
242-
{
243-
Scheduler::ReadLockT Lock(Sched.MGraphLock);
244-
Command *Cmd = static_cast<Command *>(Self->getCommand());
245-
if (Cmd)
246-
submittedQueue = Cmd->getSubmittedQueue();
247-
}
248241
wait(Self);
249242

250-
{
251-
Scheduler::ReadLockT Lock(Sched.MGraphLock);
252-
for (auto &EventImpl : getWaitList()) {
253-
Command *Cmd = (Command *)EventImpl->getCommand();
254-
if (Cmd)
255-
Cmd->getSubmittedQueue()->throw_asynchronous();
256-
}
257-
}
258-
if (submittedQueue)
259-
submittedQueue->throw_asynchronous();
243+
if (QueueImplPtr SubmittedQueue = MSubmittedQueue.lock())
244+
SubmittedQueue->throw_asynchronous();
260245
}
261246

262247
void event_impl::cleanupCommand(

sycl/source/detail/event_impl.hpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,15 @@ class event_impl {
225225
MWorkerQueue = WorkerQueue;
226226
};
227227

228+
/// Sets original queue used for submission.
229+
///
230+
/// @return
231+
void setSubmittedQueue(const QueueImplPtr &SubmittedQueue) {
232+
MSubmittedQueue = SubmittedQueue;
233+
};
234+
235+
QueueImplPtr getSubmittedQueue() const { return MSubmittedQueue.lock(); };
236+
228237
/// Checks if an event is in a fully intialized state. Default-constructed
229238
/// events will return true only after having initialized its native event,
230239
/// while other events will assume that they are fully initialized at
@@ -234,7 +243,12 @@ class event_impl {
234243
/// state.
235244
bool isInitialized() const noexcept { return MIsInitialized; }
236245

237-
private:
246+
void attachEventToComplete(const EventImplPtr &Event) {
247+
std::lock_guard<std::mutex> Lock(MMutex);
248+
MPostCompleteEvents.push_back(Event);
249+
}
250+
251+
protected:
238252
// When instrumentation is enabled emits trace event for event wait begin and
239253
// returns the telemetry event generated for the wait
240254
void *instrumentationProlog(std::string &Name, int32_t StreamID,
@@ -257,11 +271,14 @@ class event_impl {
257271
const bool MIsProfilingEnabled = false;
258272

259273
std::weak_ptr<queue_impl> MWorkerQueue;
274+
std::weak_ptr<queue_impl> MSubmittedQueue;
260275

261276
/// Dependency events prepared for waiting by backend.
262277
std::vector<EventImplPtr> MPreparedDepsEvents;
263278
std::vector<EventImplPtr> MPreparedHostDepsEvents;
264279

280+
std::vector<EventImplPtr> MPostCompleteEvents;
281+
265282
/// Indicates that the task associated with this event has been submitted by
266283
/// the queue to the device.
267284
std::atomic<bool> MIsFlushed = false;

sycl/source/detail/queue_impl.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,15 @@ void queue_impl::wait(const detail::code_location &CodeLoc) {
377377
for (event &Event : SharedEvents)
378378
Event.wait();
379379
}
380+
381+
std::vector<EventImplPtr> StreamsServiceEvents;
382+
{
383+
std::lock_guard<std::mutex> Lock(MMutex);
384+
StreamsServiceEvents.swap(MStreamsServiceEvents);
385+
}
386+
for (const EventImplPtr &Event : StreamsServiceEvents)
387+
Event->wait(Event);
388+
380389
#ifdef XPTI_ENABLE_INSTRUMENTATION
381390
instrumentationEpilog(TelemetryEvent, Name, StreamID, IId);
382391
#endif

sycl/source/detail/queue_impl.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,11 @@ class queue_impl {
446446
return MAssertHappenedBuffer;
447447
}
448448

449+
void registerStreamServiceEvent(const EventImplPtr &Event) {
450+
std::lock_guard<std::mutex> Lock(MMutex);
451+
MStreamsServiceEvents.push_back(Event);
452+
}
453+
449454
protected:
450455
// template is needed for proper unit testing
451456
template <typename HandlerType = handler>
@@ -480,7 +485,7 @@ class queue_impl {
480485
EventRet = Handler.finalize();
481486
}
482487

483-
private:
488+
protected:
484489
/// Helper function for checking whether a device is either a member of a
485490
/// context or a descendnant of its member.
486491
/// \return True iff the device or its parent is a member of the context.
@@ -610,12 +615,14 @@ class queue_impl {
610615

611616
const bool MIsInorder;
612617

618+
std::vector<EventImplPtr> MStreamsServiceEvents;
619+
613620
public:
614621
// Queue constructed with the discard_events property
615622
const bool MDiscardEvents;
616623
const bool MIsProfilingEnabled;
617624

618-
private:
625+
protected:
619626
// This flag says if we can discard events based on a queue "setup" which will
620627
// be common for all operations submitted to the queue. This is a must
621628
// condition for discarding, but even if it's true, in some cases, we won't be

sycl/source/detail/scheduler/commands.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,9 +394,9 @@ Command::Command(CommandType Type, QueueImplPtr Queue)
394394
MPreparedDepsEvents(MEvent->getPreparedDepsEvents()),
395395
MPreparedHostDepsEvents(MEvent->getPreparedHostDepsEvents()),
396396
MType(Type) {
397-
MSubmittedQueue = MQueue;
398397
MWorkerQueue = MQueue;
399398
MEvent->setWorkerQueue(MWorkerQueue);
399+
MEvent->setSubmittedQueue(MWorkerQueue);
400400
MEvent->setCommand(this);
401401
MEvent->setContextImpl(MQueue->getContextImplPtr());
402402
MEvent->setStateIncomplete();
@@ -1712,8 +1712,8 @@ ExecCGCommand::ExecCGCommand(std::unique_ptr<detail::CG> CommandGroup,
17121712
: Command(CommandType::RUN_CG, std::move(Queue)),
17131713
MCommandGroup(std::move(CommandGroup)) {
17141714
if (MCommandGroup->getType() == detail::CG::CodeplayHostTask) {
1715-
MSubmittedQueue =
1716-
static_cast<detail::CGHostTask *>(MCommandGroup.get())->MQueue;
1715+
MEvent->setSubmittedQueue(
1716+
static_cast<detail::CGHostTask *>(MCommandGroup.get())->MQueue);
17171717
MEvent->setNeedsCleanupAfterWait(true);
17181718
} else if (MCommandGroup->getType() == CG::CGTYPE::Kernel &&
17191719
(static_cast<CGExecKernel *>(MCommandGroup.get())->hasStreams() ||

sycl/source/detail/scheduler/commands.hpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ class Command {
148148

149149
const QueueImplPtr &getQueue() const { return MQueue; }
150150

151-
const QueueImplPtr &getSubmittedQueue() const { return MSubmittedQueue; }
152-
153151
const EventImplPtr &getEvent() const { return MEvent; }
154152

155153
// Methods needed to support SYCL instrumentation
@@ -216,7 +214,6 @@ class Command {
216214

217215
protected:
218216
QueueImplPtr MQueue;
219-
QueueImplPtr MSubmittedQueue;
220217
EventImplPtr MEvent;
221218
QueueImplPtr MWorkerQueue;
222219

sycl/source/detail/scheduler/scheduler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
161161
cleanupCommands(ToCleanUp);
162162

163163
for (auto StreamImplPtr : Streams) {
164-
StreamImplPtr->flush();
164+
StreamImplPtr->flush(NewEvent);
165165
}
166166

167167
return NewEvent;

sycl/source/detail/stream_impl.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//
77
//===----------------------------------------------------------------------===//
88

9+
#include <detail/queue_impl.hpp>
910
#include <detail/scheduler/scheduler.hpp>
1011
#include <detail/stream_impl.hpp>
1112
#include <sycl/queue.hpp>
@@ -68,13 +69,13 @@ size_t stream_impl::get_size() const { return BufferSize_; }
6869

6970
size_t stream_impl::get_max_statement_size() const { return MaxStatementSize_; }
7071

71-
void stream_impl::flush() {
72+
void stream_impl::flush(const EventImplPtr &LeadEvent) {
7273
// We don't want stream flushing to be blocking operation that is why submit a
7374
// host task to print stream buffer. It will fire up as soon as the kernel
7475
// finishes execution.
7576
auto Q = detail::createSyclObjFromImpl<queue>(
7677
sycl::detail::Scheduler::getInstance().getDefaultHostQueue());
77-
Q.submit([&](handler &cgh) {
78+
event Event = Q.submit([&](handler &cgh) {
7879
auto BufHostAcc =
7980
detail::Scheduler::getInstance()
8081
.StreamBuffersPool.find(this)
@@ -96,7 +97,14 @@ void stream_impl::flush() {
9697
fflush(stdout);
9798
});
9899
});
100+
if (LeadEvent) {
101+
LeadEvent->attachEventToComplete(detail::getSyclObjImpl(Event));
102+
LeadEvent->getSubmittedQueue()->registerStreamServiceEvent(
103+
detail::getSyclObjImpl(Event));
104+
}
99105
}
106+
107+
void stream_impl::flush() { flush(nullptr); }
100108
} // namespace detail
101109
} // __SYCL_INLINE_VER_NAMESPACE(_V1)
102110
} // namespace sycl

sycl/source/detail/stream_impl.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ class __SYCL_EXPORT stream_impl {
4242
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH);
4343

4444
// Enqueue task to copy stream buffer to the host and print the contents
45+
// The host task event is then registered for post processing in the
46+
// LeadEvent as well as in queue LeadEvent associated with.
47+
void flush(const EventImplPtr &LeadEvent);
48+
49+
// Enqueue task to copy stream buffer to the host and print the contents
50+
// Remove during next ABI breaking window
4551
void flush();
4652

4753
size_t get_size() const;

sycl/test/abi/sycl_symbols_linux.dump

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3756,6 +3756,7 @@ _ZN4sycl3_V16detail11make_kernelEmRKNS0_7contextENS0_7backendE
37563756
_ZN4sycl3_V16detail11stream_impl15accessGlobalBufERNS0_7handlerE
37573757
_ZN4sycl3_V16detail11stream_impl18accessGlobalOffsetERNS0_7handlerE
37583758
_ZN4sycl3_V16detail11stream_impl20accessGlobalFlushBufERNS0_7handlerE
3759+
_ZN4sycl3_V16detail11stream_impl5flushERKSt10shared_ptrINS1_10event_implEE
37593760
_ZN4sycl3_V16detail11stream_impl5flushEv
37603761
_ZN4sycl3_V16detail11stream_implC1EmmRKNS0_13property_listE
37613762
_ZN4sycl3_V16detail11stream_implC1EmmRNS0_7handlerE

0 commit comments

Comments
 (0)