Skip to content

[SYCL] Add support for blocking enqueue #733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions sycl/include/CL/sycl/detail/scheduler/commands.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ class AllocaCommand;
class AllocaCommandBase;
class ReleaseCommand;

enum BlockingT { NON_BLOCKING = 0, BLOCKING };

// The struct represents the result of command enqueueing
struct EnqueueResultT {
enum ResultT { SUCCESS, BLOCKED, FAILED };
EnqueueResultT(ResultT Result = SUCCESS, Command *Cmd = nullptr,
cl_int ErrCode = CL_SUCCESS)
: MResult(Result), MCmd(Cmd), MErrCode(ErrCode) {}
// Indicates result of enqueueing
ResultT MResult;
// Pointer to the command failed to enqueue
Command *MCmd;
// Error code which is set when enqueueing fails
cl_int MErrCode;
};


// DepDesc represents dependency between two commands
struct DepDesc {
DepDesc(Command *DepCommand, Requirement *Req, AllocaCommandBase *AllocaCmd)
Expand Down Expand Up @@ -85,9 +102,11 @@ class Command {
// Return type of the command, e.g. Allocate, MemoryCopy.
CommandType getType() const { return MType; }

// The method checks if the command is enqueued, call enqueueImp if not and
// returns CL_SUCCESS on success.
cl_int enqueue();
// The method checks if the command is enqueued, waits for it to be unblocked
// if "Blocking" argument is true, then calls enqueueImp.
// Returns true if the command is enqueued. Sets EnqueueResult to the specific
// status otherwise.
bool enqueue(EnqueueResultT &EnqueueResult, BlockingT Blocking);

bool isFinished();

Expand Down Expand Up @@ -116,12 +135,20 @@ class Command {
virtual cl_int enqueueImp() = 0;

public:
std::vector<DepDesc> MDeps;
std::vector<Command *> MUsers;

private:
// The type of the command
CommandType MType;
// Indicates whether the command is enqueued or not
std::atomic<bool> MEnqueued;
// Contains list of dependencies(edges)
std::vector<DepDesc> MDeps;
// Contains list of commands that depend on the command
std::vector<Command *> MUsers;
// Mutex used to protect enqueueing from race conditions
std::mutex MEnqueueMtx;
// Indicates whether the command can be blocked from enqueueing
bool MIsBlockable = false;
// Indicates whether the command is blocked from enqueueing
std::atomic<bool> MCanEnqueue;
};

// The command does nothing during enqueue. The task can be used to implement
Expand Down
11 changes: 6 additions & 5 deletions sycl/include/CL/sycl/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Scheduler {

QueueImplPtr getDefaultHostQueue() { return DefaultHostQueue; }

private:
protected:
Scheduler();
static Scheduler instance;

Expand Down Expand Up @@ -194,10 +194,11 @@ class Scheduler {
// Wait for the command, associated with Event passed, is completed.
static void waitForEvent(EventImplPtr Event);

// Enqueue the command passed to the underlying device.
// Returns pointer to command which failed to enqueue, so this command
// with all commands that depend on it can be rescheduled.
static Command *enqueueCommand(Command *Cmd);
// Enqueue the command passed and all it's dependencies to the underlying
// device. Returns true is the command is successfully enqueued. Sets
// EnqueueResult to the specific status otherwise.
static bool enqueueCommand(Command *Cmd, EnqueueResultT &EnqueueResult,
BlockingT Blocking = NON_BLOCKING);
};

void waitForRecordToFinish(MemObjRecord *Record);
Expand Down
4 changes: 2 additions & 2 deletions sycl/source/detail/event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ event_impl::event_impl(std::shared_ptr<cl::sycl::detail::queue_impl> Queue) {
void event_impl::wait(
std::shared_ptr<cl::sycl::detail::event_impl> Self) const {

if (m_Event || m_HostEvent)
if (m_Event)
// presence of m_Event means the command has been enqueued, so no need to
// go via the slow path event waiting in the scheduler
waitInternal();
else
else if (m_Command)
detail::Scheduler::getInstance().waitForEvent(std::move(Self));
}

Expand Down
38 changes: 33 additions & 5 deletions sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,39 @@ Command::Command(CommandType Type, QueueImplPtr Queue, bool UseExclusiveQueue)
MEvent->setContextImpl(detail::getSyclObjImpl(MQueue->get_context()));
}

cl_int Command::enqueue() {
bool Expected = false;
if (MEnqueued.compare_exchange_strong(Expected, true))
return enqueueImp();
return CL_SUCCESS;
bool Command::enqueue(EnqueueResultT &EnqueueResult, BlockingT Blocking) {
// Exit if already enqueued
if (MEnqueued)
return true;

// If the command is blocked from enqueueing
if (MIsBlockable && !MCanEnqueue) {
// Exit if enqueue type is not blocking
if (!Blocking) {
EnqueueResult = EnqueueResultT(EnqueueResultT::BLOCKED, this);
return false;
}
// Wait if blocking
while (!MCanEnqueue)
;
}

std::lock_guard<std::mutex> Lock(MEnqueueMtx);

// Exit if the command is already enqueued
if (MEnqueued)
return true;

cl_int Res = enqueueImp();

if (CL_SUCCESS != Res)
EnqueueResult = EnqueueResultT(EnqueueResultT::FAILED, this, Res);
else
// Consider the command is successfully enqueued if return code is
// CL_SUCCESS
MEnqueued = true;

return static_cast<bool>(MEnqueued);
}

cl_int AllocaCommand::enqueueImp() {
Expand Down
41 changes: 32 additions & 9 deletions sycl/source/detail/scheduler/graph_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ Scheduler::GraphProcessor::getWaitList(EventImplPtr Event) {
void Scheduler::GraphProcessor::waitForEvent(EventImplPtr Event) {
Command *Cmd = getCommand(Event);
assert(Cmd && "Event has no associated command?");
Command *FailedCommand = enqueueCommand(Cmd);
if (FailedCommand)
EnqueueResultT Res;
bool Enqueued = enqueueCommand(Cmd, Res, BLOCKING);
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
// TODO: Reschedule commands.
throw runtime_error("Enqueue process failed.");

Expand All @@ -49,18 +50,40 @@ void Scheduler::GraphProcessor::waitForEvent(EventImplPtr Event) {
PI_CALL(RT::piEventsWait(1, &CLEvent));
}

Command *Scheduler::GraphProcessor::enqueueCommand(Command *Cmd) {
bool Scheduler::GraphProcessor::enqueueCommand(Command *Cmd,
EnqueueResultT &EnqueueResult,
BlockingT Blocking) {
if (!Cmd || Cmd->isEnqueued())
return nullptr;
return true;

// Indicates whether dependency cannot be enqueued
bool BlockedByDep = false;

for (DepDesc &Dep : Cmd->MDeps) {
Command *FailedCommand = enqueueCommand(Dep.MDepCommand);
if (FailedCommand)
return FailedCommand;
const bool Enqueued =
enqueueCommand(Dep.MDepCommand, EnqueueResult, Blocking);
if (!Enqueued)
switch (EnqueueResult.MResult) {
case EnqueueResultT::FAILED:
default:
// Exit immediately if a command fails to avoid enqueueing commands
// result of which will be discarded.
return false;
case EnqueueResultT::BLOCKED:
// If some dependency is blocked from enqueueing remember that, but
// try to enqueue other dependencies(that can be ready for
// enqueueing).
BlockedByDep = true;
break;
}
}

cl_int Result = Cmd->enqueue();
return CL_SUCCESS == Result ? nullptr : Cmd;
// Exit if some command is blocked from enqueueing, the EnqueueResult is set
// by the latest dependency which was blocked.
if (BlockedByDep)
return false;

return Cmd->enqueue(EnqueueResult, Blocking);
}

} // namespace detail
Expand Down
40 changes: 18 additions & 22 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,25 @@ namespace detail {

void Scheduler::waitForRecordToFinish(MemObjRecord *Record) {
for (Command *Cmd : Record->MReadLeafs) {
Command *FailedCommand = GraphProcessor::enqueueCommand(Cmd);
if (FailedCommand) {
assert(!FailedCommand && "Command failed to enqueue");
EnqueueResultT Res;
bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res);
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
throw runtime_error("Enqueue process failed.");
}
GraphProcessor::waitForEvent(Cmd->getEvent());
}
for (Command *Cmd : Record->MWriteLeafs) {
Command *FailedCommand = GraphProcessor::enqueueCommand(Cmd);
if (FailedCommand) {
assert(!FailedCommand && "Command failed to enqueue");
EnqueueResultT Res;
bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res);
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
throw runtime_error("Enqueue process failed.");
}
GraphProcessor::waitForEvent(Cmd->getEvent());
}
for (AllocaCommandBase *AllocaCmd : Record->MAllocaCommands) {
Command *ReleaseCmd = AllocaCmd->getReleaseCmd();
Command *FailedCommand = GraphProcessor::enqueueCommand(ReleaseCmd);
if (FailedCommand) {
assert(!FailedCommand && "Command failed to enqueue");
EnqueueResultT Res;
bool Enqueued = GraphProcessor::enqueueCommand(ReleaseCmd, Res);
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
throw runtime_error("Enqueue process failed.");
}
GraphProcessor::waitForEvent(ReleaseCmd->getEvent());
}
}
Expand All @@ -65,10 +62,9 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
}

// TODO: Check if lazy mode.
Command *FailedCommand = GraphProcessor::enqueueCommand(NewCmd);
MGraphBuilder.cleanupCommands();
if (FailedCommand)
// TODO: Reschedule commands.
EnqueueResultT Res;
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res);
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
throw runtime_error("Enqueue process failed.");
}

Expand All @@ -85,9 +81,9 @@ EventImplPtr Scheduler::addCopyBack(Requirement *Req) {
// buffer.
if (!NewCmd)
return nullptr;
Command *FailedCommand = GraphProcessor::enqueueCommand(NewCmd);
if (FailedCommand)
// TODO: Reschedule commands.
EnqueueResultT Res;
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res);
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
throw runtime_error("Enqueue process failed.");
return NewCmd->getEvent();
}
Expand Down Expand Up @@ -137,9 +133,9 @@ EventImplPtr Scheduler::addHostAccessor(Requirement *Req) {

if (!NewCmd)
return nullptr;
Command *FailedCommand = GraphProcessor::enqueueCommand(NewCmd);
if (FailedCommand)
// TODO: Reschedule commands.
EnqueueResultT Res;
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res);
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
throw runtime_error("Enqueue process failed.");
return RetEvent;
}
Expand Down
106 changes: 106 additions & 0 deletions sycl/test/scheduler/BlockedCommands.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// RUN: %clangxx -fsycl %s -o %t.out
// RUN: env SYCL_DEVICE_TYPE=HOST %t.out
//==------------------- BlockedCommands.cpp --------------------------------==//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//

#include <iostream>

#include <CL/cl.h>
#include <CL/sycl.hpp>

using namespace cl::sycl;

class FakeCommand : public detail::Command {
public:
FakeCommand(detail::QueueImplPtr Queue)
: Command(detail::Command::ALLOCA, Queue) {}
void printDot(std::ostream &Stream) const override {}

cl_int enqueueImp() override { return MRetVal; }

cl_int MRetVal = CL_SUCCESS;
};

class TestScheduler : public detail::Scheduler {
public:
static bool enqueueCommand(detail::Command *Cmd,
detail::EnqueueResultT &EnqueueResult,
detail::BlockingT Blocking) {
return GraphProcessor::enqueueCommand(Cmd, EnqueueResult, Blocking);
}
};

int main() {
cl::sycl::queue Queue;
FakeCommand FakeCmd(detail::getSyclObjImpl(Queue));

FakeCmd.MIsBlockable = true;
FakeCmd.MCanEnqueue = false;
FakeCmd.MRetVal = CL_DEVICE_PARTITION_EQUALLY;

{
detail::EnqueueResultT Res;
bool Enqueued =
TestScheduler::enqueueCommand(&FakeCmd, Res, detail::NON_BLOCKING);

if (Enqueued) {
std::cerr << "Blocked command should not be enqueued" << std::endl;
return 1;
}

if (detail::EnqueueResultT::BLOCKED != Res.MResult) {
std::cerr << "Result of enqueueing blocked command should be BLOCKED"
<< std::endl;
return 1;
}
}

FakeCmd.MCanEnqueue = true;

{
detail::EnqueueResultT Res;
bool Enqueued =
TestScheduler::enqueueCommand(&FakeCmd, Res, detail::BLOCKING);

if (Enqueued) {
std::cerr << "The command is expected to fail to enqueue." << std::endl;
return 1;
}

if (detail::EnqueueResultT::FAILED != Res.MResult) {
std::cerr << "The command is expected to fail to enqueue." << std::endl;
return 1;
}

if (CL_DEVICE_PARTITION_EQUALLY != Res.MErrCode) {
std::cerr << "Expected different error code." << std::endl;
return 1;
}

if (&FakeCmd != Res.MCmd) {
std::cerr << "Expected different failed command." << std::endl;
return 1;
}
}

FakeCmd.MRetVal = CL_SUCCESS;

{
detail::EnqueueResultT Res;
bool Enqueued =
TestScheduler::enqueueCommand(&FakeCmd, Res, detail::BLOCKING);

if (!Enqueued || detail::EnqueueResultT::SUCCESS != Res.MResult) {
std::cerr << "The command is expected to be successfully enqueued."
<< std::endl;
return 1;
}
}

return 0;
}