Skip to content

Commit 8d37cba

Browse files
authored
[SYCL] Store stream offset in flush buffer's memory (#2767)
Flush buffer's offset is not shared between copies of stream object. It may lead to a loss or double printing of work items' statements written in one of copied stream objects. Use two first bytes of work item's flush buffer for storing buffer's offset to share it between copies of stream object. Signed-off-by: Mikhail Lychkov <[email protected]>
1 parent e481174 commit 8d37cba

File tree

16 files changed

+415
-68
lines changed

16 files changed

+415
-68
lines changed

sycl/include/CL/sycl/handler.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ template <typename T_Src, int Dims_Src, cl::sycl::access::mode AccessMode_Src,
6565
cl::sycl::access::placeholder IsPlaceholder_Dst>
6666
class __copyAcc2Acc;
6767

68+
// For unit testing purposes
69+
class MockHandler;
70+
6871
__SYCL_INLINE_NAMESPACE(cl) {
6972
namespace sycl {
7073

@@ -1951,6 +1954,8 @@ class __SYCL_EXPORT handler {
19511954
friend void detail::associateWithHandler(handler &,
19521955
detail::AccessorBaseHost *,
19531956
access::target);
1957+
1958+
friend class ::MockHandler;
19541959
};
19551960
} // namespace sycl
19561961
} // __SYCL_INLINE_NAMESPACE(cl)

sycl/include/CL/sycl/stream.hpp

Lines changed: 88 additions & 55 deletions
Large diffs are not rendered by default.

sycl/source/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ set(SYCL_SOURCES
134134
"detail/scheduler/commands.cpp"
135135
"detail/scheduler/leaves_collection.cpp"
136136
"detail/scheduler/scheduler.cpp"
137+
"detail/scheduler/scheduler_helpers.cpp"
137138
"detail/scheduler/graph_processor.cpp"
138139
"detail/scheduler/graph_builder.cpp"
139140
"detail/spec_constant_impl.cpp"

sycl/source/detail/scheduler/scheduler.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <detail/global_handler.hpp>
1212
#include <detail/queue_impl.hpp>
1313
#include <detail/scheduler/scheduler.hpp>
14+
#include <detail/scheduler/scheduler_helpers.hpp>
1415
#include <detail/stream_impl.hpp>
1516

1617
#include <chrono>
@@ -72,6 +73,19 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
7273
const bool IsKernel = CommandGroup->getType() == CG::KERNEL;
7374
const bool IsHostKernel = CommandGroup->getType() == CG::RUN_ON_HOST_INTEL;
7475
vector_class<StreamImplPtr> Streams;
76+
77+
if (IsKernel) {
78+
Streams = ((CGExecKernel *)CommandGroup.get())->getStreams();
79+
// Stream's flush buffer memory is mainly initialized in stream's __init
80+
// method. However, this method is not available on host device.
81+
// Initializing stream's flush buffer on the host side in a separate task.
82+
if (Queue->is_host()) {
83+
for (const StreamImplPtr &Stream : Streams) {
84+
initStream(Stream, Queue);
85+
}
86+
}
87+
}
88+
7589
{
7690
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::defer_lock);
7791
lockSharedTimedMutex(Lock);
@@ -102,9 +116,6 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
102116
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
103117
throw runtime_error("Enqueue process failed.", PI_INVALID_OPERATION);
104118

105-
if (IsKernel)
106-
Streams = ((ExecCGCommand *)NewCmd)->getStreams();
107-
108119
// If there are no memory dependencies decouple and free the command.
109120
// Though, dismiss ownership of native kernel command group as it's
110121
// resources may be in use by backend and synchronization point here is

sycl/source/detail/scheduler/scheduler.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ class Scheduler {
764764
};
765765

766766
friend class stream_impl;
767+
friend void initStream(StreamImplPtr, QueueImplPtr);
767768

768769
// Protects stream buffers pool
769770
std::recursive_mutex StreamBuffersPoolMutex;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
//==-- scheduler_helpers.cpp - SYCL Scheduler helper functions --*- C++ -*-===//
2+
//
3+
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4+
// See https://llvm.org/LICENSE.txt for license information.
5+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
//
7+
//===----------------------------------------------------------------------===//
8+
9+
#include <CL/sycl/queue.hpp>
10+
#include <detail/queue_impl.hpp>
11+
#include <detail/scheduler/scheduler.hpp>
12+
#include <detail/scheduler/scheduler_helpers.hpp>
13+
#include <detail/stream_impl.hpp>
14+
15+
__SYCL_INLINE_NAMESPACE(cl) {
16+
namespace sycl {
17+
namespace detail {
18+
19+
void initStream(StreamImplPtr Stream, QueueImplPtr Queue) {
20+
Scheduler::StreamBuffers *StrBufs{};
21+
22+
{
23+
std::lock_guard<std::recursive_mutex> lock(
24+
Scheduler::getInstance().StreamBuffersPoolMutex);
25+
26+
auto StreamBuf =
27+
Scheduler::getInstance().StreamBuffersPool.find(Stream.get());
28+
assert((StreamBuf != Scheduler::getInstance().StreamBuffersPool.end()) &&
29+
"Stream is unexpectedly not found in pool.");
30+
31+
StrBufs = StreamBuf->second;
32+
}
33+
34+
assert(StrBufs && "No buffers for a stream.");
35+
36+
// Real size of full flush buffer is saved only in buffer_impl field of
37+
// FlushBuf object.
38+
size_t FlushBufSize = getSyclObjImpl(StrBufs->FlushBuf)->get_count();
39+
40+
auto Q = createSyclObjFromImpl<queue>(Queue);
41+
Q.submit([&](handler &cgh) {
42+
auto FlushBufAcc =
43+
StrBufs->FlushBuf.get_access<access::mode::discard_write,
44+
access::target::host_buffer>(
45+
cgh, range<1>(FlushBufSize), id<1>(0));
46+
cgh.codeplay_host_task([=] {
47+
char *FlushBufPtr = FlushBufAcc.get_pointer();
48+
std::memset(FlushBufPtr, 0, FlushBufAcc.get_size());
49+
});
50+
});
51+
}
52+
53+
} // namespace detail
54+
} // namespace sycl
55+
} // __SYCL_INLINE_NAMESPACE(cl)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
//==---------- scheduler_helpers.hpp - SYCL standard header file -----------==//
2+
//
3+
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4+
// See https://llvm.org/LICENSE.txt for license information.
5+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
//
7+
//===----------------------------------------------------------------------===//
8+
9+
#pragma once
10+
11+
#include <CL/sycl/detail/defines_elementary.hpp>
12+
13+
#include <memory>
14+
15+
__SYCL_INLINE_NAMESPACE(cl) {
16+
namespace sycl {
17+
namespace detail {
18+
19+
class stream_impl;
20+
class queue_impl;
21+
22+
using StreamImplPtr = std::shared_ptr<detail::stream_impl>;
23+
using QueueImplPtr = std::shared_ptr<detail::queue_impl>;
24+
25+
void initStream(StreamImplPtr Stream, QueueImplPtr Queue);
26+
27+
} // namespace detail
28+
} // namespace sycl
29+
} // __SYCL_INLINE_NAMESPACE(cl)

sycl/source/detail/stream_impl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ stream_impl::stream_impl(size_t BufferSize, size_t MaxStatementSize,
2727
// the end of line symbol.
2828
detail::Scheduler::getInstance().allocateStreamBuffers(
2929
this, BufferSize + OffsetSize + 1 /* size of the stream buffer */,
30-
MaxStatementSize /* size of the flush buffer */);
30+
MaxStatementSize + FLUSH_BUF_OFFSET_SIZE /* size of the flush buffer */);
3131
}
3232

3333
// Method to provide an access to the global stream buffer
@@ -43,7 +43,7 @@ GlobalBufAccessorT stream_impl::accessGlobalFlushBuf(handler &CGH) {
4343
return detail::Scheduler::getInstance()
4444
.StreamBuffersPool.find(this)
4545
->second->FlushBuf.get_access<cl::sycl::access::mode::read_write>(
46-
CGH, range<1>(MaxStatementSize_), id<1>(0));
46+
CGH, range<1>(MaxStatementSize_ + FLUSH_BUF_OFFSET_SIZE), id<1>(0));
4747
}
4848

4949
// Method to provide an atomic access to the offset in the global stream
@@ -76,7 +76,7 @@ void stream_impl::flush() {
7676
cgh, range<1>(BufferSize_), id<1>(OffsetSize));
7777
// Create accessor to the flush buffer even if not using it yet. Otherwise
7878
// kernel will be a leaf for the flush buffer and scheduler will not be able
79-
// to cleanup the kernel. TODO: git rid of finalize method by using host
79+
// to cleanup the kernel. TODO: get rid of finalize method by using host
8080
// accessor to the flush buffer.
8181
auto FlushBufHostAcc =
8282
detail::Scheduler::getInstance()

sycl/source/detail/stream_impl.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ namespace detail {
2323

2424
class __SYCL_EXPORT stream_impl {
2525
public:
26+
// TODO: Handler argument is not used in constructor.
27+
// To be removed when API/ABI changes are allowed.
2628
stream_impl(size_t BufferSize, size_t MaxStatementSize, handler &CGH);
2729

2830
// Method to provide an access to the global stream buffer

sycl/source/stream.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,24 @@
1313
__SYCL_INLINE_NAMESPACE(cl) {
1414
namespace sycl {
1515

16+
// Maximum possible size of a flush buffer statement in bytes
17+
static constexpr size_t MAX_STATEMENT_SIZE =
18+
(1 << (CHAR_BIT * detail::FLUSH_BUF_OFFSET_SIZE)) - 1;
19+
1620
stream::stream(size_t BufferSize, size_t MaxStatementSize, handler &CGH)
1721
: impl(std::make_shared<detail::stream_impl>(BufferSize, MaxStatementSize,
1822
CGH)),
1923
GlobalBuf(impl->accessGlobalBuf(CGH)),
2024
GlobalOffset(impl->accessGlobalOffset(CGH)),
2125
// Allocate the flush buffer, which contains space for each work item
2226
GlobalFlushBuf(impl->accessGlobalFlushBuf(CGH)),
23-
FlushBufferSize(MaxStatementSize) {
27+
FlushBufferSize(MaxStatementSize + detail::FLUSH_BUF_OFFSET_SIZE) {
28+
if (MaxStatementSize > MAX_STATEMENT_SIZE) {
29+
throw sycl::invalid_parameter_error(
30+
"Maximum statement size exceeds limit of " +
31+
std::to_string(MAX_STATEMENT_SIZE) + " bytes.",
32+
PI_INVALID_VALUE);
33+
}
2434

2535
// Save stream implementation in the handler so that stream will be alive
2636
// during kernel execution

sycl/test/on-device/basic_tests/stream/stream.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
32
// RUN: %RUN_ON_HOST %t.out | FileCheck %s
43
// RUN: %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
@@ -249,11 +248,12 @@ int main() {
249248
[&](h_item<3> Item) { Out << Item << endl; });
250249
});
251250
});
252-
// CHECK-NEXT: h_item(
253-
// CHECK-NEXT: global item(range: {1, 1, 1}, id: {0, 0, 0})
254-
// CHECK-NEXT: logical local item(range: {1, 1, 1}, id: {0, 0, 0})
255-
// CHECK-NEXT: physical local item(range: {1, 1, 1}, id: {0, 0, 0})
256-
// CHECK-NEXT: )
251+
Queue.wait();
252+
// CHECK-NEXT: h_item(
253+
// CHECK-NEXT: global item(range: {1, 1, 1}, id: {0, 0, 0})
254+
// CHECK-NEXT: logical local item(range: {1, 1, 1}, id: {0, 0, 0})
255+
// CHECK-NEXT: physical local item(range: {1, 1, 1}, id: {0, 0, 0})
256+
// CHECK-NEXT: )
257257

258258
// Multiple streams in command group
259259
Queue.submit([&](handler &CGH) {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
2+
// RUN: env SYCL_DEVICE_TYPE=HOST %t.out | FileCheck %s
3+
// RUN: %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
4+
// RUN: %GPU_RUN_ON_LINUX_PLACEHOLDER %t.out %GPU_CHECK_ON_LINUX_PLACEHOLDER
5+
// RUN: %ACC_RUN_PLACEHOLDER %t.out %ACC_CHECK_PLACEHOLDER
6+
7+
#include <CL/sycl.hpp>
8+
9+
using namespace cl::sycl;
10+
11+
void dev_func(stream out) { out << "dev_func print\n"; }
12+
13+
int main() {
14+
{
15+
queue Queue;
16+
17+
Queue.submit([&](handler &cgh) {
18+
stream stream1(1024, 160, cgh);
19+
cgh.parallel_for<class test_dev_func_stream>(
20+
nd_range<1>(range<1>(1), range<1>(1)), [=](sycl::nd_item<1> it) {
21+
stream1 << "stream1 print 1\n";
22+
dev_func(stream1);
23+
sycl::stream stream2 = stream1;
24+
stream1 << "stream1 print 2" << sycl::endl;
25+
stream2 << "stream2 print 1\n";
26+
stream1 << "stream1 print 3\n";
27+
stream2 << sycl::flush;
28+
});
29+
});
30+
Queue.wait();
31+
// CHECK: stream1 print 1
32+
// CHECK-NEXT: dev_func print
33+
// CHECK-NEXT: stream1 print 2
34+
// CHECK-NEXT: stream2 print 1
35+
// CHECK-NEXT: stream1 print 3
36+
}
37+
38+
return 0;
39+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
2+
// RUN: env SYCL_DEVICE_TYPE=HOST %t.out | FileCheck %s
3+
// RUN: %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
4+
// RUN: %GPU_RUN_ON_LINUX_PLACEHOLDER %t.out %GPU_CHECK_ON_LINUX_PLACEHOLDER
5+
// RUN: %ACC_RUN_PLACEHOLDER %t.out %ACC_CHECK_PLACEHOLDER
6+
7+
#include <CL/sycl.hpp>
8+
9+
#include <cassert>
10+
11+
using namespace cl;
12+
13+
int main() {
14+
sycl::queue Queue;
15+
try {
16+
Queue.submit([&](sycl::handler &cgh) {
17+
sycl::stream Out(100, 65536, cgh);
18+
cgh.single_task<class test_max_stmt_exceed>(
19+
[=]() { Out << "Hello world!" << sycl::endl; });
20+
});
21+
Queue.wait();
22+
} catch (sycl::exception &ExpectedException) {
23+
// CHECK: Maximum statement size exceeds limit of 65535 bytes
24+
std::cout << ExpectedException.what() << std::endl;
25+
}
26+
return 0;
27+
}

sycl/unittests/scheduler/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ add_sycl_unittest(SchedulerTests OBJECT
99
LinkedAllocaDependencies.cpp
1010
LeavesCollection.cpp
1111
NoUnifiedHostMemory.cpp
12+
StreamInitDependencyOnHost.cpp
1213
utils.cpp
1314
)

sycl/unittests/scheduler/SchedulerTestUtils.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,12 @@ class MockScheduler : public cl::sycl::detail::Scheduler {
130130
const cl::sycl::detail::QueueImplPtr &Queue) {
131131
return MGraphBuilder.insertMemoryMove(Record, Req, Queue);
132132
}
133+
134+
cl::sycl::detail::Command *
135+
addCG(std::unique_ptr<cl::sycl::detail::CG> CommandGroup,
136+
cl::sycl::detail::QueueImplPtr Queue) {
137+
return MGraphBuilder.addCG(std::move(CommandGroup), Queue);
138+
}
133139
};
134140

135141
void addEdge(cl::sycl::detail::Command *User, cl::sycl::detail::Command *Dep,

0 commit comments

Comments
 (0)