Skip to content

[SYCL] Store stream offset in flush buffer's memory #2767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sycl/include/CL/sycl/handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ template <typename T_Src, int Dims_Src, cl::sycl::access::mode AccessMode_Src,
cl::sycl::access::placeholder IsPlaceholder_Dst>
class __copyAcc2Acc;

// For unit testing purposes
class MockHandler;

__SYCL_INLINE_NAMESPACE(cl) {
namespace sycl {

Expand Down Expand Up @@ -1951,6 +1954,8 @@ class __SYCL_EXPORT handler {
friend void detail::associateWithHandler(handler &,
detail::AccessorBaseHost *,
access::target);

friend class ::MockHandler;
};
} // namespace sycl
} // __SYCL_INLINE_NAMESPACE(cl)
143 changes: 88 additions & 55 deletions sycl/include/CL/sycl/stream.hpp

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions sycl/source/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ set(SYCL_SOURCES
"detail/scheduler/commands.cpp"
"detail/scheduler/leaves_collection.cpp"
"detail/scheduler/scheduler.cpp"
"detail/scheduler/scheduler_helpers.cpp"
"detail/scheduler/graph_processor.cpp"
"detail/scheduler/graph_builder.cpp"
"detail/spec_constant_impl.cpp"
Expand Down
17 changes: 14 additions & 3 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <detail/global_handler.hpp>
#include <detail/queue_impl.hpp>
#include <detail/scheduler/scheduler.hpp>
#include <detail/scheduler/scheduler_helpers.hpp>
#include <detail/stream_impl.hpp>

#include <chrono>
Expand Down Expand Up @@ -72,6 +73,19 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
const bool IsKernel = CommandGroup->getType() == CG::KERNEL;
const bool IsHostKernel = CommandGroup->getType() == CG::RUN_ON_HOST_INTEL;
vector_class<StreamImplPtr> Streams;

if (IsKernel) {
Streams = ((CGExecKernel *)CommandGroup.get())->getStreams();
// Stream's flush buffer memory is mainly initialized in stream's __init
// method. However, this method is not available on host device.
// Initializing stream's flush buffer on the host side in a separate task.
if (Queue->is_host()) {
for (const StreamImplPtr &Stream : Streams) {
initStream(Stream, Queue);
}
}
}

{
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::defer_lock);
lockSharedTimedMutex(Lock);
Expand Down Expand Up @@ -102,9 +116,6 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Enqueue process failed.", PI_INVALID_OPERATION);

if (IsKernel)
Streams = ((ExecCGCommand *)NewCmd)->getStreams();

// If there are no memory dependencies decouple and free the command.
// Though, dismiss ownership of native kernel command group as it's
// resources may be in use by backend and synchronization point here is
Expand Down
1 change: 1 addition & 0 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ class Scheduler {
};

friend class stream_impl;
friend void initStream(StreamImplPtr, QueueImplPtr);

// Protects stream buffers pool
std::recursive_mutex StreamBuffersPoolMutex;
Expand Down
55 changes: 55 additions & 0 deletions sycl/source/detail/scheduler/scheduler_helpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//==-- scheduler_helpers.cpp - SYCL Scheduler helper functions --*- C++ -*-===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//

#include <CL/sycl/queue.hpp>
#include <detail/queue_impl.hpp>
#include <detail/scheduler/scheduler.hpp>
#include <detail/scheduler/scheduler_helpers.hpp>
#include <detail/stream_impl.hpp>

__SYCL_INLINE_NAMESPACE(cl) {
namespace sycl {
namespace detail {

void initStream(StreamImplPtr Stream, QueueImplPtr Queue) {
Scheduler::StreamBuffers *StrBufs{};

{
std::lock_guard<std::recursive_mutex> lock(
Scheduler::getInstance().StreamBuffersPoolMutex);

auto StreamBuf =
Scheduler::getInstance().StreamBuffersPool.find(Stream.get());
assert((StreamBuf != Scheduler::getInstance().StreamBuffersPool.end()) &&
"Stream is unexpectedly not found in pool.");

StrBufs = StreamBuf->second;
}

assert(StrBufs && "No buffers for a stream.");

// Real size of full flush buffer is saved only in buffer_impl field of
// FlushBuf object.
size_t FlushBufSize = getSyclObjImpl(StrBufs->FlushBuf)->get_count();

auto Q = createSyclObjFromImpl<queue>(Queue);
Q.submit([&](handler &cgh) {
auto FlushBufAcc =
StrBufs->FlushBuf.get_access<access::mode::discard_write,
access::target::host_buffer>(
cgh, range<1>(FlushBufSize), id<1>(0));
cgh.codeplay_host_task([=] {
char *FlushBufPtr = FlushBufAcc.get_pointer();
std::memset(FlushBufPtr, 0, FlushBufAcc.get_size());
});
});
}

} // namespace detail
} // namespace sycl
} // __SYCL_INLINE_NAMESPACE(cl)
29 changes: 29 additions & 0 deletions sycl/source/detail/scheduler/scheduler_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//==---------- scheduler_helpers.hpp - SYCL standard header file -----------==//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//

#pragma once

#include <CL/sycl/detail/defines_elementary.hpp>

#include <memory>

__SYCL_INLINE_NAMESPACE(cl) {
namespace sycl {
namespace detail {

class stream_impl;
class queue_impl;

using StreamImplPtr = std::shared_ptr<detail::stream_impl>;
using QueueImplPtr = std::shared_ptr<detail::queue_impl>;

void initStream(StreamImplPtr Stream, QueueImplPtr Queue);

} // namespace detail
} // namespace sycl
} // __SYCL_INLINE_NAMESPACE(cl)
6 changes: 3 additions & 3 deletions sycl/source/detail/stream_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ stream_impl::stream_impl(size_t BufferSize, size_t MaxStatementSize,
// the end of line symbol.
detail::Scheduler::getInstance().allocateStreamBuffers(
this, BufferSize + OffsetSize + 1 /* size of the stream buffer */,
MaxStatementSize /* size of the flush buffer */);
MaxStatementSize + FLUSH_BUF_OFFSET_SIZE /* size of the flush buffer */);
}

// Method to provide an access to the global stream buffer
Expand All @@ -43,7 +43,7 @@ GlobalBufAccessorT stream_impl::accessGlobalFlushBuf(handler &CGH) {
return detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second->FlushBuf.get_access<cl::sycl::access::mode::read_write>(
CGH, range<1>(MaxStatementSize_), id<1>(0));
CGH, range<1>(MaxStatementSize_ + FLUSH_BUF_OFFSET_SIZE), id<1>(0));
}

// Method to provide an atomic access to the offset in the global stream
Expand Down Expand Up @@ -76,7 +76,7 @@ void stream_impl::flush() {
cgh, range<1>(BufferSize_), id<1>(OffsetSize));
// Create accessor to the flush buffer even if not using it yet. Otherwise
// kernel will be a leaf for the flush buffer and scheduler will not be able
// to cleanup the kernel. TODO: git rid of finalize method by using host
// to cleanup the kernel. TODO: get rid of finalize method by using host
// accessor to the flush buffer.
auto FlushBufHostAcc =
detail::Scheduler::getInstance()
Expand Down
2 changes: 2 additions & 0 deletions sycl/source/detail/stream_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace detail {

class __SYCL_EXPORT stream_impl {
public:
// TODO: Handler argument is not used in constructor.
// To be removed when API/ABI changes are allowed.
stream_impl(size_t BufferSize, size_t MaxStatementSize, handler &CGH);

// Method to provide an access to the global stream buffer
Expand Down
12 changes: 11 additions & 1 deletion sycl/source/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,24 @@
__SYCL_INLINE_NAMESPACE(cl) {
namespace sycl {

// Maximum possible size of a flush buffer statement in bytes
static constexpr size_t MAX_STATEMENT_SIZE =
(1 << (CHAR_BIT * detail::FLUSH_BUF_OFFSET_SIZE)) - 1;

stream::stream(size_t BufferSize, size_t MaxStatementSize, handler &CGH)
: impl(std::make_shared<detail::stream_impl>(BufferSize, MaxStatementSize,
CGH)),
GlobalBuf(impl->accessGlobalBuf(CGH)),
GlobalOffset(impl->accessGlobalOffset(CGH)),
// Allocate the flush buffer, which contains space for each work item
GlobalFlushBuf(impl->accessGlobalFlushBuf(CGH)),
FlushBufferSize(MaxStatementSize) {
FlushBufferSize(MaxStatementSize + detail::FLUSH_BUF_OFFSET_SIZE) {
if (MaxStatementSize > MAX_STATEMENT_SIZE) {
throw sycl::invalid_parameter_error(
"Maximum statement size exceeds limit of " +
std::to_string(MAX_STATEMENT_SIZE) + " bytes.",
PI_INVALID_VALUE);
}

// Save stream implementation in the handler so that stream will be alive
// during kernel execution
Expand Down
12 changes: 6 additions & 6 deletions sycl/test/on-device/basic_tests/stream/stream.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
// RUN: %RUN_ON_HOST %t.out | FileCheck %s
// RUN: %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
Expand Down Expand Up @@ -249,11 +248,12 @@ int main() {
[&](h_item<3> Item) { Out << Item << endl; });
});
});
// CHECK-NEXT: h_item(
// CHECK-NEXT: global item(range: {1, 1, 1}, id: {0, 0, 0})
// CHECK-NEXT: logical local item(range: {1, 1, 1}, id: {0, 0, 0})
// CHECK-NEXT: physical local item(range: {1, 1, 1}, id: {0, 0, 0})
// CHECK-NEXT: )
Queue.wait();
// CHECK-NEXT: h_item(
// CHECK-NEXT: global item(range: {1, 1, 1}, id: {0, 0, 0})
// CHECK-NEXT: logical local item(range: {1, 1, 1}, id: {0, 0, 0})
// CHECK-NEXT: physical local item(range: {1, 1, 1}, id: {0, 0, 0})
// CHECK-NEXT: )

// Multiple streams in command group
Queue.submit([&](handler &CGH) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
// RUN: env SYCL_DEVICE_TYPE=HOST %t.out | FileCheck %s
// RUN: %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
// RUN: %GPU_RUN_ON_LINUX_PLACEHOLDER %t.out %GPU_CHECK_ON_LINUX_PLACEHOLDER
// RUN: %ACC_RUN_PLACEHOLDER %t.out %ACC_CHECK_PLACEHOLDER

#include <CL/sycl.hpp>

using namespace cl::sycl;

void dev_func(stream out) { out << "dev_func print\n"; }

int main() {
{
queue Queue;

Queue.submit([&](handler &cgh) {
stream stream1(1024, 160, cgh);
cgh.parallel_for<class test_dev_func_stream>(
nd_range<1>(range<1>(1), range<1>(1)), [=](sycl::nd_item<1> it) {
stream1 << "stream1 print 1\n";
dev_func(stream1);
sycl::stream stream2 = stream1;
stream1 << "stream1 print 2" << sycl::endl;
stream2 << "stream2 print 1\n";
stream1 << "stream1 print 3\n";
stream2 << sycl::flush;
});
});
Queue.wait();
// CHECK: stream1 print 1
// CHECK-NEXT: dev_func print
// CHECK-NEXT: stream1 print 2
// CHECK-NEXT: stream2 print 1
// CHECK-NEXT: stream1 print 3
}

return 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
// RUN: env SYCL_DEVICE_TYPE=HOST %t.out | FileCheck %s
// RUN: %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
// RUN: %GPU_RUN_ON_LINUX_PLACEHOLDER %t.out %GPU_CHECK_ON_LINUX_PLACEHOLDER
// RUN: %ACC_RUN_PLACEHOLDER %t.out %ACC_CHECK_PLACEHOLDER

#include <CL/sycl.hpp>

#include <cassert>

using namespace cl;

int main() {
sycl::queue Queue;
try {
Queue.submit([&](sycl::handler &cgh) {
sycl::stream Out(100, 65536, cgh);
cgh.single_task<class test_max_stmt_exceed>(
[=]() { Out << "Hello world!" << sycl::endl; });
});
Queue.wait();
} catch (sycl::exception &ExpectedException) {
// CHECK: Maximum statement size exceeds limit of 65535 bytes
std::cout << ExpectedException.what() << std::endl;
}
return 0;
}
1 change: 1 addition & 0 deletions sycl/unittests/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ add_sycl_unittest(SchedulerTests OBJECT
LinkedAllocaDependencies.cpp
LeavesCollection.cpp
NoUnifiedHostMemory.cpp
StreamInitDependencyOnHost.cpp
utils.cpp
)
6 changes: 6 additions & 0 deletions sycl/unittests/scheduler/SchedulerTestUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ class MockScheduler : public cl::sycl::detail::Scheduler {
const cl::sycl::detail::QueueImplPtr &Queue) {
return MGraphBuilder.insertMemoryMove(Record, Req, Queue);
}

cl::sycl::detail::Command *
addCG(std::unique_ptr<cl::sycl::detail::CG> CommandGroup,
cl::sycl::detail::QueueImplPtr Queue) {
return MGraphBuilder.addCG(std::move(CommandGroup), Queue);
}
};

void addEdge(cl::sycl::detail::Command *User, cl::sycl::detail::Command *Dep,
Expand Down
Loading