Skip to content

[SYCL] Wait for stream service tasks #7130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 3, 2022
4 changes: 4 additions & 0 deletions sycl/include/sycl/memory_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ static constexpr std::memory_order getStdMemoryOrder(sycl::memory_order order) {
case memory_order::seq_cst:
return std::memory_order_seq_cst;
}
// Return default value here to avoid compiler warnings.
// default case in switch doesn't help because some compiler warn about
// having a default case while all values of enum are handled.
return std::memory_order_acq_rel;
}
#endif // __SYCL_DEVICE_ONLY__

Expand Down
41 changes: 13 additions & 28 deletions sycl/source/detail/event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,22 @@ event_impl::~event_impl() {

void event_impl::waitInternal() {
if (!MHostEvent && MEvent) {
// Wait for the native event
getPlugin().call<PiApiKind::piEventsWait>(1, &MEvent);
return;
}

if (MState == HES_Discarded)
} else if (MState == HES_Discarded) {
// Waiting for the discarded event is invalid
throw sycl::exception(
make_error_code(errc::invalid),
"waitInternal method cannot be used for a discarded event.");
} else if (MState != HES_Complete) {
// Wait for the host event
std::unique_lock<std::mutex> lock(MMutex);
cv.wait(lock, [this] { return MState == HES_Complete; });
}

if (MState == HES_Complete)
return;

std::unique_lock<std::mutex> lock(MMutex);
cv.wait(lock, [this] { return MState == HES_Complete; });
// Wait for connected events(e.g. streams prints)
for (const EventImplPtr &Event : MPostCompleteEvents)
Event->wait(Event);
}

void event_impl::setComplete() {
Expand Down Expand Up @@ -236,27 +238,10 @@ void event_impl::wait(std::shared_ptr<sycl::detail::event_impl> Self) {

void event_impl::wait_and_throw(
std::shared_ptr<sycl::detail::event_impl> Self) {
Scheduler &Sched = Scheduler::getInstance();

QueueImplPtr submittedQueue = nullptr;
{
Scheduler::ReadLockT Lock(Sched.MGraphLock);
Command *Cmd = static_cast<Command *>(Self->getCommand());
if (Cmd)
submittedQueue = Cmd->getSubmittedQueue();
}
wait(Self);

{
Scheduler::ReadLockT Lock(Sched.MGraphLock);
for (auto &EventImpl : getWaitList()) {
Command *Cmd = (Command *)EventImpl->getCommand();
if (Cmd)
Cmd->getSubmittedQueue()->throw_asynchronous();
}
}
if (submittedQueue)
submittedQueue->throw_asynchronous();
if (QueueImplPtr SubmittedQueue = MSubmittedQueue.lock())
SubmittedQueue->throw_asynchronous();
}

void event_impl::cleanupCommand(
Expand Down
19 changes: 18 additions & 1 deletion sycl/source/detail/event_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ class event_impl {
MWorkerQueue = WorkerQueue;
};

/// Sets original queue used for submission.
///
/// @return
void setSubmittedQueue(const QueueImplPtr &SubmittedQueue) {
MSubmittedQueue = SubmittedQueue;
};

QueueImplPtr getSubmittedQueue() const { return MSubmittedQueue.lock(); };

/// Checks if an event is in a fully intialized state. Default-constructed
/// events will return true only after having initialized its native event,
/// while other events will assume that they are fully initialized at
Expand All @@ -234,7 +243,12 @@ class event_impl {
/// state.
bool isInitialized() const noexcept { return MIsInitialized; }

private:
void attachEventToComplete(const EventImplPtr &Event) {
std::lock_guard<std::mutex> Lock(MMutex);
MPostCompleteEvents.push_back(Event);
}

protected:
// When instrumentation is enabled emits trace event for event wait begin and
// returns the telemetry event generated for the wait
void *instrumentationProlog(std::string &Name, int32_t StreamID,
Expand All @@ -257,11 +271,14 @@ class event_impl {
const bool MIsProfilingEnabled = false;

std::weak_ptr<queue_impl> MWorkerQueue;
std::weak_ptr<queue_impl> MSubmittedQueue;

/// Dependency events prepared for waiting by backend.
std::vector<EventImplPtr> MPreparedDepsEvents;
std::vector<EventImplPtr> MPreparedHostDepsEvents;

std::vector<EventImplPtr> MPostCompleteEvents;

/// Indicates that the task associated with this event has been submitted by
/// the queue to the device.
std::atomic<bool> MIsFlushed = false;
Expand Down
9 changes: 9 additions & 0 deletions sycl/source/detail/queue_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,15 @@ void queue_impl::wait(const detail::code_location &CodeLoc) {
for (event &Event : SharedEvents)
Event.wait();
}

std::vector<EventImplPtr> StreamsServiceEvents;
{
std::lock_guard<std::mutex> Lock(MMutex);
StreamsServiceEvents.swap(MStreamsServiceEvents);
}
for (const EventImplPtr &Event : StreamsServiceEvents)
Event->wait(Event);

#ifdef XPTI_ENABLE_INSTRUMENTATION
instrumentationEpilog(TelemetryEvent, Name, StreamID, IId);
#endif
Expand Down
11 changes: 9 additions & 2 deletions sycl/source/detail/queue_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ class queue_impl {
return MAssertHappenedBuffer;
}

void registerStreamServiceEvent(const EventImplPtr &Event) {
std::lock_guard<std::mutex> Lock(MMutex);
MStreamsServiceEvents.push_back(Event);
}

protected:
// template is needed for proper unit testing
template <typename HandlerType = handler>
Expand Down Expand Up @@ -484,7 +489,7 @@ class queue_impl {
EventRet = Handler.finalize();
}

private:
protected:
/// Helper function for checking whether a device is either a member of a
/// context or a descendnant of its member.
/// \return True iff the device or its parent is a member of the context.
Expand Down Expand Up @@ -614,12 +619,14 @@ class queue_impl {

const bool MIsInorder;

std::vector<EventImplPtr> MStreamsServiceEvents;

public:
// Queue constructed with the discard_events property
const bool MDiscardEvents;
const bool MIsProfilingEnabled;

private:
protected:
// This flag says if we can discard events based on a queue "setup" which will
// be common for all operations submitted to the queue. This is a must
// condition for discarding, but even if it's true, in some cases, we won't be
Expand Down
6 changes: 3 additions & 3 deletions sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ Command::Command(CommandType Type, QueueImplPtr Queue)
MPreparedDepsEvents(MEvent->getPreparedDepsEvents()),
MPreparedHostDepsEvents(MEvent->getPreparedHostDepsEvents()),
MType(Type) {
MSubmittedQueue = MQueue;
MWorkerQueue = MQueue;
MEvent->setWorkerQueue(MWorkerQueue);
MEvent->setSubmittedQueue(MWorkerQueue);
MEvent->setCommand(this);
MEvent->setContextImpl(MQueue->getContextImplPtr());
MEvent->setStateIncomplete();
Expand Down Expand Up @@ -1712,8 +1712,8 @@ ExecCGCommand::ExecCGCommand(std::unique_ptr<detail::CG> CommandGroup,
: Command(CommandType::RUN_CG, std::move(Queue)),
MCommandGroup(std::move(CommandGroup)) {
if (MCommandGroup->getType() == detail::CG::CodeplayHostTask) {
MSubmittedQueue =
static_cast<detail::CGHostTask *>(MCommandGroup.get())->MQueue;
MEvent->setSubmittedQueue(
static_cast<detail::CGHostTask *>(MCommandGroup.get())->MQueue);
MEvent->setNeedsCleanupAfterWait(true);
} else if (MCommandGroup->getType() == CG::CGTYPE::Kernel &&
(static_cast<CGExecKernel *>(MCommandGroup.get())->hasStreams() ||
Expand Down
3 changes: 0 additions & 3 deletions sycl/source/detail/scheduler/commands.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ class Command {

const QueueImplPtr &getQueue() const { return MQueue; }

const QueueImplPtr &getSubmittedQueue() const { return MSubmittedQueue; }

const EventImplPtr &getEvent() const { return MEvent; }

// Methods needed to support SYCL instrumentation
Expand Down Expand Up @@ -216,7 +214,6 @@ class Command {

protected:
QueueImplPtr MQueue;
QueueImplPtr MSubmittedQueue;
EventImplPtr MEvent;
QueueImplPtr MWorkerQueue;

Expand Down
2 changes: 1 addition & 1 deletion sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
cleanupCommands(ToCleanUp);

for (auto StreamImplPtr : Streams) {
StreamImplPtr->flush();
StreamImplPtr->flush(NewEvent);
}

return NewEvent;
Expand Down
12 changes: 10 additions & 2 deletions sycl/source/detail/stream_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//
//===----------------------------------------------------------------------===//

#include <detail/queue_impl.hpp>
#include <detail/scheduler/scheduler.hpp>
#include <detail/stream_impl.hpp>
#include <sycl/queue.hpp>
Expand Down Expand Up @@ -68,13 +69,13 @@ size_t stream_impl::get_size() const { return BufferSize_; }

size_t stream_impl::get_max_statement_size() const { return MaxStatementSize_; }

void stream_impl::flush() {
void stream_impl::flush(const EventImplPtr &LeadEvent) {
// We don't want stream flushing to be blocking operation that is why submit a
// host task to print stream buffer. It will fire up as soon as the kernel
// finishes execution.
auto Q = detail::createSyclObjFromImpl<queue>(
sycl::detail::Scheduler::getInstance().getDefaultHostQueue());
Q.submit([&](handler &cgh) {
event Event = Q.submit([&](handler &cgh) {
auto BufHostAcc =
detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
Expand All @@ -96,7 +97,14 @@ void stream_impl::flush() {
fflush(stdout);
});
});
if (LeadEvent) {
LeadEvent->attachEventToComplete(detail::getSyclObjImpl(Event));
LeadEvent->getSubmittedQueue()->registerStreamServiceEvent(
detail::getSyclObjImpl(Event));
}
}

void stream_impl::flush() { flush(nullptr); }
} // namespace detail
} // __SYCL_INLINE_VER_NAMESPACE(_V1)
} // namespace sycl
6 changes: 6 additions & 0 deletions sycl/source/detail/stream_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class __SYCL_EXPORT stream_impl {
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH);

// Enqueue task to copy stream buffer to the host and print the contents
// The host task event is then registered for post processing in the
// LeadEvent as well as in queue LeadEvent associated with.
void flush(const EventImplPtr &LeadEvent);

// Enqueue task to copy stream buffer to the host and print the contents
// Remove during next ABI breaking window
void flush();

size_t get_size() const;
Expand Down
1 change: 1 addition & 0 deletions sycl/test/abi/sycl_symbols_linux.dump
Original file line number Diff line number Diff line change
Expand Up @@ -3756,6 +3756,7 @@ _ZN4sycl3_V16detail11make_kernelEmRKNS0_7contextENS0_7backendE
_ZN4sycl3_V16detail11stream_impl15accessGlobalBufERNS0_7handlerE
_ZN4sycl3_V16detail11stream_impl18accessGlobalOffsetERNS0_7handlerE
_ZN4sycl3_V16detail11stream_impl20accessGlobalFlushBufERNS0_7handlerE
_ZN4sycl3_V16detail11stream_impl5flushERKSt10shared_ptrINS1_10event_implEE
_ZN4sycl3_V16detail11stream_impl5flushEv
_ZN4sycl3_V16detail11stream_implC1EmmRKNS0_13property_listE
_ZN4sycl3_V16detail11stream_implC1EmmRNS0_7handlerE
Expand Down
Loading