Skip to content

Commit 9941a6c

Browse files
committed
[SYCL] Store stream buffers in the scheduler
Stream buffers need to be alive after submitting a kernel because it is executed by the scheduler asynchronosly. For this reason currently stream buffers are stored in an associated stream object. This stream object is passed to the handler and then forwarded further to a commandi group to keep streamm buffers alive for the scheduler. But there is a problem with this approach. A command group cannot be destroyed while stream buffers (which are accessed in this command group) are alive. Stream buffers are destroyed only if the stream is destroyed. Stream object is destrtoyed only if command group is destroyed. So, there is a loop dependcy. Which results in memory leaks. Solution is to store stream buffers in the scheduler for each stream. With this approach resources are released properly.
1 parent 82893b2 commit 9941a6c

File tree

6 files changed

+121
-35
lines changed

6 files changed

+121
-35
lines changed

sycl/source/detail/scheduler/scheduler.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,14 @@ void Scheduler::enqueueLeavesOfReqUnlocked(const Requirement *const Req) {
238238
EnqueueLeaves(Record->MWriteLeaves);
239239
}
240240

241+
void Scheduler::allocateStreamBuffers(stream_impl *Impl,
242+
size_t StreamBufferSize,
243+
size_t FlushBufferSize) {
244+
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
245+
StreamBuffersPool.insert(
246+
{Impl, StreamBuffers(StreamBufferSize, FlushBufferSize)});
247+
}
248+
241249
Scheduler::Scheduler() {
242250
sycl::device HostDevice;
243251
DefaultHostQueue = QueueImplPtr(

sycl/source/detail/scheduler/scheduler.hpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,38 @@ class Scheduler {
715715

716716
friend class Command;
717717
friend class DispatchHostTask;
718+
719+
class StreamBuffers {
720+
public:
721+
StreamBuffers(size_t StreamBufferSize, size_t FlushBufferSize)
722+
// Initialize stream buffer with zeros, this is needed for two reasons:
723+
// 1. We don't need to care about end of line when printing out
724+
// streamed data.
725+
// 2. Offset is properly initialized.
726+
: Data(StreamBufferSize, 0),
727+
Buf(Data.data(), range<1>(StreamBufferSize),
728+
{property::buffer::use_host_ptr()}),
729+
FlushBuf(range<1>(FlushBufferSize)) {}
730+
731+
// Vector on the host side which is used to initialize the stream
732+
// buffer
733+
std::vector<char> Data;
734+
735+
// Stream buffer
736+
buffer<char, 1> Buf;
737+
738+
// Global flush buffer
739+
buffer<char, 1> FlushBuf;
740+
};
741+
742+
friend class stream_impl;
743+
744+
// Protects stream buffers pool
745+
std::mutex StreamBuffersPoolMutex;
746+
std::map<stream_impl *, StreamBuffers> StreamBuffersPool;
747+
748+
// Allocate buffers in the pool for a provided stream
749+
void allocateStreamBuffers(stream_impl *, size_t, size_t);
718750
};
719751

720752
} // namespace detail

sycl/source/detail/stream_impl.cpp

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,55 @@ namespace detail {
1717

1818
stream_impl::stream_impl(size_t BufferSize, size_t MaxStatementSize,
1919
handler &CGH)
20-
: BufferSize_(BufferSize), MaxStatementSize_(MaxStatementSize),
21-
// Allocate additional place for the offset variable and the end of line
22-
// symbol. Initialize buffer with zeros, this is needed for two reasons:
23-
// 1. We don't need to care about end of line when printing out streamed
24-
// data.
25-
// 2. Offset is properly initialized.
26-
Data(BufferSize + OffsetSize + 1, 0),
27-
Buf(Data.data(), range<1>(BufferSize + OffsetSize + 1),
28-
{property::buffer::use_host_ptr()}),
29-
30-
FlushBuf(range<1>(MaxStatementSize)) {}
20+
: BufferSize_(BufferSize), MaxStatementSize_(MaxStatementSize) {
21+
// We need to store stream buffers in the scheduler because they need to be
22+
// alive after submitting the kernel. They cannot be stored in the stream
23+
// object because it causes loop dependency between objects and results in
24+
// memory leak.
25+
// Allocate additional place in the stream buffer for the offset variable and
26+
// the end of line symbol.
27+
detail::Scheduler::getInstance().allocateStreamBuffers(
28+
this, BufferSize + OffsetSize + 1 /* size of the stream buffer */,
29+
MaxStatementSize /* size of the flush buffer */);
30+
}
31+
32+
// Method to provide an access to the global stream buffer
33+
GlobalBufAccessorT stream_impl::accessGlobalBuf(handler &CGH) {
34+
return detail::Scheduler::getInstance()
35+
.StreamBuffersPool.find(this)
36+
->second.Buf.get_access<cl::sycl::access::mode::read_write>(
37+
CGH, range<1>(BufferSize_), id<1>(OffsetSize));
38+
}
3139

40+
// Method to provide an accessor to the global flush buffer
41+
GlobalBufAccessorT stream_impl::accessGlobalFlushBuf(handler &CGH) {
42+
return detail::Scheduler::getInstance()
43+
.StreamBuffersPool.find(this)
44+
->second.FlushBuf.get_access<cl::sycl::access::mode::read_write>(
45+
CGH, range<1>(MaxStatementSize_), id<1>(0));
46+
}
47+
48+
// Method to provide an atomic access to the offset in the global stream
49+
// buffer and offset in the flush buffer
50+
GlobalOffsetAccessorT stream_impl::accessGlobalOffset(handler &CGH) {
51+
auto OffsetSubBuf = buffer<char, 1>(
52+
detail::Scheduler::getInstance().StreamBuffersPool.find(this)->second.Buf,
53+
id<1>(0), range<1>(OffsetSize));
54+
auto ReinterpretedBuf = OffsetSubBuf.reinterpret<unsigned, 1>(range<1>(2));
55+
return ReinterpretedBuf.get_access<cl::sycl::access::mode::atomic>(
56+
CGH, range<1>(2), id<1>(0));
57+
}
3258
size_t stream_impl::get_size() const { return BufferSize_; }
3359

3460
size_t stream_impl::get_max_statement_size() const { return MaxStatementSize_; }
3561

3662
void stream_impl::flush() {
3763
// Access the stream buffer on the host. This access guarantees that kernel is
3864
// executed and buffer contains streamed data.
39-
auto HostAcc = Buf.get_access<cl::sycl::access::mode::read>(
40-
range<1>(BufferSize_), id<1>(OffsetSize));
65+
auto HostAcc = detail::Scheduler::getInstance()
66+
.StreamBuffersPool.find(this)
67+
->second.Buf.get_access<cl::sycl::access::mode::read>(
68+
range<1>(BufferSize_), id<1>(OffsetSize));
4169

4270
printf("%s", HostAcc.get_pointer());
4371
fflush(stdout);

sycl/source/detail/stream_impl.hpp

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,14 @@ class __SYCL_EXPORT stream_impl {
2626
stream_impl(size_t BufferSize, size_t MaxStatementSize, handler &CGH);
2727

2828
// Method to provide an access to the global stream buffer
29-
GlobalBufAccessorT accessGlobalBuf(handler &CGH) {
30-
return Buf.get_access<cl::sycl::access::mode::read_write>(
31-
CGH, range<1>(BufferSize_), id<1>(OffsetSize));
32-
}
29+
GlobalBufAccessorT accessGlobalBuf(handler &CGH);
3330

3431
// Method to provide an accessor to the global flush buffer
35-
GlobalBufAccessorT accessGlobalFlushBuf(handler &CGH) {
36-
return FlushBuf.get_access<cl::sycl::access::mode::read_write>(
37-
CGH, range<1>(MaxStatementSize_), id<1>(0));
38-
}
32+
GlobalBufAccessorT accessGlobalFlushBuf(handler &CGH);
3933

4034
// Method to provide an atomic access to the offset in the global stream
4135
// buffer and offset in the flush buffer
42-
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH) {
43-
auto OffsetSubBuf = buffer<char, 1>(Buf, id<1>(0), range<1>(OffsetSize));
44-
auto ReinterpretedBuf = OffsetSubBuf.reinterpret<unsigned, 1>(range<1>(2));
45-
return ReinterpretedBuf.get_access<cl::sycl::access::mode::atomic>(
46-
CGH, range<1>(2), id<1>(0));
47-
}
36+
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH);
4837

4938
// Copy stream buffer to the host and print the contents
5039
void flush();
@@ -65,14 +54,6 @@ class __SYCL_EXPORT stream_impl {
6554
// 2 variables: offset in the stream buffer and offset in the flush buffer.
6655
static const size_t OffsetSize = 2 * sizeof(unsigned);
6756

68-
// Vector on the host side which is used to initialize the stream buffer
69-
std::vector<char> Data;
70-
71-
// Stream buffer
72-
buffer<char, 1> Buf;
73-
74-
// Global flush buffer
75-
buffer<char, 1> FlushBuf;
7657
};
7758

7859
} // namespace detail

sycl/test/abi/sycl_symbols_linux.dump

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4156,3 +4156,6 @@ _ZNK2cl4sycl9exception11has_contextEv
41564156
_ZNK2cl4sycl9exception4whatEv
41574157
__sycl_register_lib
41584158
__sycl_unregister_lib
4159+
_ZN2cl4sycl6detail11stream_impl15accessGlobalBufERNS0_7handlerE
4160+
_ZN2cl4sycl6detail11stream_impl20accessGlobalFlushBufERNS0_7handlerE
4161+
_ZN2cl4sycl6detail11stream_impl18accessGlobalOffsetERNS0_7handlerE
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
2+
// RUN: env SYCL_PI_TRACE=2 %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
3+
// RUN: env SYCL_PI_TRACE=2 %GPU_RUN_PLACEHOLDER %t.out %GPU_CHECK_PLACEHOLDER
4+
// RUN: env SYCL_PI_TRACE=2 %ACC_RUN_PLACEHOLDER %t.out %ACC_CHECK_PLACEHOLDER
5+
6+
//==----------------------- release_resources_test.cpp ---------------------==//
7+
//
8+
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
9+
// See https://llvm.org/LICENSE.txt for license information.
10+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
11+
//
12+
//===----------------------------------------------------------------------===//
13+
14+
// Check that buffer used by a stream object is released.
15+
16+
#include <CL/sycl.hpp>
17+
18+
using namespace cl::sycl;
19+
20+
int main() {
21+
{
22+
queue Queue;
23+
24+
// CHECK:---> piMemRelease
25+
Queue.submit([&](handler &CGH) {
26+
stream Out(1024, 80, CGH);
27+
CGH.parallel_for<class test_cleanup1>(
28+
range<1>(2), [=](id<1> i) { Out << "Hello, World!" << endl; });
29+
});
30+
Queue.wait();
31+
}
32+
33+
return 0;
34+
}

0 commit comments

Comments
 (0)