Skip to content

Commit 81f5dff

Browse files
committed
[SYCL] Add support for blocking enqueue
The patch introduces new property of the Command class: blocked from enqueueing. This means that the command cannot be enqueued so until it's unblocked. Places where enqueue process is initiated were updated so all "wait for event" calls make blocking enqueue which doesn't return until all required commands are actually enqueued. Signed-off-by: Vlad Romanov <[email protected]>
1 parent 5491486 commit 81f5dff

File tree

7 files changed

+231
-50
lines changed

7 files changed

+231
-50
lines changed

sycl/include/CL/sycl/detail/scheduler/commands.hpp

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ class AllocaCommand;
3232
class AllocaCommandBase;
3333
class ReleaseCommand;
3434

35+
enum BlockingT { NON_BLOCKING = 0, BLOCKING };
36+
37+
// The struct represents the result of command enqueueing
38+
struct EnqueueResultT {
39+
enum ResultT { SUCCESS, BLOCKED, FAILED };
40+
EnqueueResultT(ResultT Result = SUCCESS, Command *Cmd = nullptr,
41+
cl_int ErrCode = CL_SUCCESS)
42+
: MResult(Result), MCmd(Cmd), MErrCode(ErrCode) {}
43+
// Indicates result of enqueueing
44+
ResultT MResult;
45+
// Pointer to the command failed to enqueue
46+
Command *MCmd;
47+
// Error code which is set when enqueueing fails
48+
cl_int MErrCode;
49+
};
50+
51+
3552
// DepDesc represents dependency between two commands
3653
struct DepDesc {
3754
DepDesc(Command *DepCommand, Requirement *Req, AllocaCommandBase *AllocaCmd)
@@ -85,9 +102,11 @@ class Command {
85102
// Return type of the command, e.g. Allocate, MemoryCopy.
86103
CommandType getType() const { return MType; }
87104

88-
// The method checks if the command is enqueued, call enqueueImp if not and
89-
// returns CL_SUCCESS on success.
90-
cl_int enqueue();
105+
// The method checks if the command is enqueued, waits for it to be unblocked
106+
// if "Blocking" argument is true, then calls enqueueImp.
107+
// Returns true if the command is enqueued. Sets EnqueueResult to the specific
108+
// status otherwise.
109+
bool enqueue(EnqueueResultT &EnqueueResult, BlockingT Blocking);
91110

92111
bool isFinished();
93112

@@ -116,12 +135,20 @@ class Command {
116135
virtual cl_int enqueueImp() = 0;
117136

118137
public:
119-
std::vector<DepDesc> MDeps;
120-
std::vector<Command *> MUsers;
121-
122-
private:
138+
// The type of the command
123139
CommandType MType;
140+
// Indicates whether the command is enqueued or not
124141
std::atomic<bool> MEnqueued;
142+
// Contains list of dependencies(edges)
143+
std::vector<DepDesc> MDeps;
144+
// Contains list of commands that depend on the command
145+
std::vector<Command *> MUsers;
146+
// Mutex used to protect enqueueing from race conditions
147+
std::mutex MEnqueueMtx;
148+
// Indicates whether the command can be blocked from enqueueing
149+
bool MIsBlockable = false;
150+
// Indicates whether the command is blocked from enqueueing
151+
std::atomic<bool> MCanEnqueue;
125152
};
126153

127154
// The command does nothing during enqueue. The task can be used to implement

sycl/include/CL/sycl/detail/scheduler/scheduler.hpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class Scheduler {
8181

8282
QueueImplPtr getDefaultHostQueue() { return DefaultHostQueue; }
8383

84-
private:
84+
protected:
8585
Scheduler();
8686
static Scheduler instance;
8787

@@ -193,10 +193,11 @@ class Scheduler {
193193
// Wait for the command, associated with Event passed, is completed.
194194
static void waitForEvent(EventImplPtr Event);
195195

196-
// Enqueue the command passed to the underlying device.
197-
// Returns pointer to command which failed to enqueue, so this command
198-
// with all commands that depend on it can be rescheduled.
199-
static Command *enqueueCommand(Command *Cmd);
196+
// Enqueue the command passed and all it's dependencies to the underlying
197+
// device. Returns true is the command is successfully enqueued. Sets
198+
// EnqueueResult to the specific status otherwise.
199+
static bool enqueueCommand(Command *Cmd, EnqueueResultT &EnqueueResult,
200+
BlockingT Blocking = NON_BLOCKING);
200201
};
201202

202203
void waitForRecordToFinish(MemObjRecord *Record);

sycl/source/detail/event_impl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,11 @@ event_impl::event_impl(std::shared_ptr<cl::sycl::detail::queue_impl> Queue) {
9595
void event_impl::wait(
9696
std::shared_ptr<cl::sycl::detail::event_impl> Self) const {
9797

98-
if (m_Event || m_HostEvent)
98+
if (m_Event)
9999
// presence of m_Event means the command has been enqueued, so no need to
100100
// go via the slow path event waiting in the scheduler
101101
waitInternal();
102-
else
102+
else if (m_Command)
103103
detail::Scheduler::getInstance().waitForEvent(std::move(Self));
104104
}
105105

sycl/source/detail/scheduler/commands.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,39 @@ Command::Command(CommandType Type, QueueImplPtr Queue, bool UseExclusiveQueue)
143143
MEvent->setContextImpl(detail::getSyclObjImpl(MQueue->get_context()));
144144
}
145145

146-
cl_int Command::enqueue() {
147-
bool Expected = false;
148-
if (MEnqueued.compare_exchange_strong(Expected, true))
149-
return enqueueImp();
150-
return CL_SUCCESS;
146+
bool Command::enqueue(EnqueueResultT &EnqueueResult, BlockingT Blocking) {
147+
// Exit if already enqueued
148+
if (MEnqueued)
149+
return true;
150+
151+
// If the command is blocked from enqueueing
152+
if (MIsBlockable && !MCanEnqueue) {
153+
// Exit if enqueue type is not blocking
154+
if (!Blocking) {
155+
EnqueueResult = EnqueueResultT(EnqueueResultT::BLOCKED, this);
156+
return false;
157+
}
158+
// Wait if blocking
159+
while (!MCanEnqueue)
160+
;
161+
}
162+
163+
std::lock_guard<std::mutex> Lock(MEnqueueMtx);
164+
165+
// Exit if the command is already enqueued
166+
if (MEnqueued)
167+
return true;
168+
169+
cl_int Res = enqueueImp();
170+
171+
if (CL_SUCCESS != Res)
172+
EnqueueResult = EnqueueResultT(EnqueueResultT::FAILED, this, Res);
173+
else
174+
// Consider the command is successfully enqueued if return code is
175+
// CL_SUCCESS
176+
MEnqueued = true;
177+
178+
return static_cast<bool>(MEnqueued);
151179
}
152180

153181
cl_int AllocaCommand::enqueueImp() {

sycl/source/detail/scheduler/graph_processor.cpp

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ Scheduler::GraphProcessor::getWaitList(EventImplPtr Event) {
3939
void Scheduler::GraphProcessor::waitForEvent(EventImplPtr Event) {
4040
Command *Cmd = getCommand(Event);
4141
assert(Cmd && "Event has no associated command?");
42-
Command *FailedCommand = enqueueCommand(Cmd);
43-
if (FailedCommand)
42+
EnqueueResultT Res;
43+
bool Enqueued = enqueueCommand(Cmd, Res, BLOCKING);
44+
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
4445
// TODO: Reschedule commands.
4546
throw runtime_error("Enqueue process failed.");
4647

@@ -49,18 +50,40 @@ void Scheduler::GraphProcessor::waitForEvent(EventImplPtr Event) {
4950
PI_CALL(RT::piEventsWait(1, &CLEvent));
5051
}
5152

52-
Command *Scheduler::GraphProcessor::enqueueCommand(Command *Cmd) {
53+
bool Scheduler::GraphProcessor::enqueueCommand(Command *Cmd,
54+
EnqueueResultT &EnqueueResult,
55+
BlockingT Blocking) {
5356
if (!Cmd || Cmd->isEnqueued())
54-
return nullptr;
57+
return true;
58+
59+
// Indicates whether dependency cannot be enqueued
60+
bool BlockedByDep = false;
5561

5662
for (DepDesc &Dep : Cmd->MDeps) {
57-
Command *FailedCommand = enqueueCommand(Dep.MDepCommand);
58-
if (FailedCommand)
59-
return FailedCommand;
63+
const bool Enqueued =
64+
enqueueCommand(Dep.MDepCommand, EnqueueResult, Blocking);
65+
if (!Enqueued)
66+
switch (EnqueueResult.MResult) {
67+
case EnqueueResultT::FAILED:
68+
default:
69+
// Exit immediately if a command fails to avoid enqueueing commands
70+
// result of which will be discarded.
71+
return false;
72+
case EnqueueResultT::BLOCKED:
73+
// If some dependency is blocked from enqueueing remember that, but
74+
// try to enqueue other dependencies(that can be ready for
75+
// enqueueing).
76+
BlockedByDep = true;
77+
break;
78+
}
6079
}
6180

62-
cl_int Result = Cmd->enqueue();
63-
return CL_SUCCESS == Result ? nullptr : Cmd;
81+
// Exit if some command is blocked from enqueueing, the EnqueueResult is set
82+
// by the latest dependency which was blocked.
83+
if (BlockedByDep)
84+
return false;
85+
86+
return Cmd->enqueue(EnqueueResult, Blocking);
6487
}
6588

6689
} // namespace detail

sycl/source/detail/scheduler/scheduler.cpp

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,25 @@ namespace detail {
2222

2323
void Scheduler::waitForRecordToFinish(MemObjRecord *Record) {
2424
for (Command *Cmd : Record->MReadLeafs) {
25-
Command *FailedCommand = GraphProcessor::enqueueCommand(Cmd);
26-
if (FailedCommand) {
27-
assert(!FailedCommand && "Command failed to enqueue");
25+
EnqueueResultT Res;
26+
bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res);
27+
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
2828
throw runtime_error("Enqueue process failed.");
29-
}
3029
GraphProcessor::waitForEvent(Cmd->getEvent());
3130
}
3231
for (Command *Cmd : Record->MWriteLeafs) {
33-
Command *FailedCommand = GraphProcessor::enqueueCommand(Cmd);
34-
if (FailedCommand) {
35-
assert(!FailedCommand && "Command failed to enqueue");
32+
EnqueueResultT Res;
33+
bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res);
34+
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
3635
throw runtime_error("Enqueue process failed.");
37-
}
3836
GraphProcessor::waitForEvent(Cmd->getEvent());
3937
}
4038
for (AllocaCommandBase *AllocaCmd : Record->MAllocaCommands) {
4139
Command *ReleaseCmd = AllocaCmd->getReleaseCmd();
42-
Command *FailedCommand = GraphProcessor::enqueueCommand(ReleaseCmd);
43-
if (FailedCommand) {
44-
assert(!FailedCommand && "Command failed to enqueue");
40+
EnqueueResultT Res;
41+
bool Enqueued = GraphProcessor::enqueueCommand(ReleaseCmd, Res);
42+
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
4543
throw runtime_error("Enqueue process failed.");
46-
}
4744
GraphProcessor::waitForEvent(ReleaseCmd->getEvent());
4845
}
4946
}
@@ -65,10 +62,9 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
6562
}
6663

6764
// TODO: Check if lazy mode.
68-
Command *FailedCommand = GraphProcessor::enqueueCommand(NewCmd);
69-
MGraphBuilder.cleanupCommands();
70-
if (FailedCommand)
71-
// TODO: Reschedule commands.
65+
EnqueueResultT Res;
66+
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res);
67+
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
7268
throw runtime_error("Enqueue process failed.");
7369
}
7470

@@ -85,9 +81,9 @@ EventImplPtr Scheduler::addCopyBack(Requirement *Req) {
8581
// buffer.
8682
if (!NewCmd)
8783
return nullptr;
88-
Command *FailedCommand = GraphProcessor::enqueueCommand(NewCmd);
89-
if (FailedCommand)
90-
// TODO: Reschedule commands.
84+
EnqueueResultT Res;
85+
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res);
86+
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
9187
throw runtime_error("Enqueue process failed.");
9288
return NewCmd->getEvent();
9389
}
@@ -137,9 +133,9 @@ EventImplPtr Scheduler::addHostAccessor(Requirement *Req) {
137133

138134
if (!NewCmd)
139135
return nullptr;
140-
Command *FailedCommand = GraphProcessor::enqueueCommand(NewCmd);
141-
if (FailedCommand)
142-
// TODO: Reschedule commands.
136+
EnqueueResultT Res;
137+
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res);
138+
if (!Enqueued && EnqueueResultT::FAILED == Res.MResult)
143139
throw runtime_error("Enqueue process failed.");
144140
return RetEvent;
145141
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// RUN: %clangxx -fsycl %s -o %t.out
2+
// RUN: env SYCL_DEVICE_TYPE=HOST %t.out
3+
//==------------------- BlockedCommands.cpp --------------------------------==//
4+
//
5+
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
6+
// See https://llvm.org/LICENSE.txt for license information.
7+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
8+
//
9+
//===----------------------------------------------------------------------===//
10+
11+
#include <iostream>
12+
13+
#include <CL/cl.h>
14+
#include <CL/sycl.hpp>
15+
16+
using namespace cl::sycl;
17+
18+
class FakeCommand : public detail::Command {
19+
public:
20+
FakeCommand(detail::QueueImplPtr Queue)
21+
: Command(detail::Command::ALLOCA, Queue) {}
22+
void printDot(std::ostream &Stream) const override {}
23+
24+
cl_int enqueueImp() override { return MRetVal; }
25+
26+
cl_int MRetVal = CL_SUCCESS;
27+
};
28+
29+
class TestScheduler : public detail::Scheduler {
30+
public:
31+
static bool enqueueCommand(detail::Command *Cmd,
32+
detail::EnqueueResultT &EnqueueResult,
33+
detail::BlockingT Blocking) {
34+
return GraphProcessor::enqueueCommand(Cmd, EnqueueResult, Blocking);
35+
}
36+
};
37+
38+
int main() {
39+
cl::sycl::queue Queue;
40+
FakeCommand FakeCmd(detail::getSyclObjImpl(Queue));
41+
42+
FakeCmd.MIsBlockable = true;
43+
FakeCmd.MCanEnqueue = false;
44+
FakeCmd.MRetVal = CL_DEVICE_PARTITION_EQUALLY;
45+
46+
{
47+
detail::EnqueueResultT Res;
48+
bool Enqueued =
49+
TestScheduler::enqueueCommand(&FakeCmd, Res, detail::NON_BLOCKING);
50+
51+
if (Enqueued) {
52+
std::cerr << "Blocked command should not be enqueued" << std::endl;
53+
return 1;
54+
}
55+
56+
if (detail::EnqueueResultT::BLOCKED != Res.MResult) {
57+
std::cerr << "Result of enqueueing blocked command should be BLOCKED"
58+
<< std::endl;
59+
return 1;
60+
}
61+
}
62+
63+
FakeCmd.MCanEnqueue = true;
64+
65+
{
66+
detail::EnqueueResultT Res;
67+
bool Enqueued =
68+
TestScheduler::enqueueCommand(&FakeCmd, Res, detail::BLOCKING);
69+
70+
if (Enqueued) {
71+
std::cerr << "The command is expected to fail to enqueue." << std::endl;
72+
return 1;
73+
}
74+
75+
if (detail::EnqueueResultT::FAILED != Res.MResult) {
76+
std::cerr << "The command is expected to fail to enqueue." << std::endl;
77+
return 1;
78+
}
79+
80+
if (CL_DEVICE_PARTITION_EQUALLY != Res.MErrCode) {
81+
std::cerr << "Expected different error code." << std::endl;
82+
return 1;
83+
}
84+
85+
if (&FakeCmd != Res.MCmd) {
86+
std::cerr << "Expected different failed command." << std::endl;
87+
return 1;
88+
}
89+
}
90+
91+
FakeCmd.MRetVal = CL_SUCCESS;
92+
93+
{
94+
detail::EnqueueResultT Res;
95+
bool Enqueued =
96+
TestScheduler::enqueueCommand(&FakeCmd, Res, detail::BLOCKING);
97+
98+
if (!Enqueued || detail::EnqueueResultT::SUCCESS != Res.MResult) {
99+
std::cerr << "The command is expected to be successfully enqueued."
100+
<< std::endl;
101+
return 1;
102+
}
103+
}
104+
105+
return 0;
106+
}

0 commit comments

Comments
 (0)