Skip to content

Commit 841e1e7

Browse files
authored
[SYCL] Store stream buffers in the scheduler (#2416)
Stream buffers need to be alive after submitting a kernel because it is executed by the scheduler asynchronously. For this reason currently stream buffers are stored in an associated stream object. This stream object is passed to the handler and then forwarded further to a command group to keep stream buffers alive for the scheduler. But there is a problem with this approach. A command group cannot be destroyed while stream buffers (which are accessed in this command group) are alive. Stream buffers are destroyed only if the stream is destroyed. Stream object is destroyed only if command group is destroyed. So, there is a loop dependency. Which results in memory leaks. Solution is to store stream buffers in the scheduler for each stream. With this approach resources are released properly.
1 parent 2c50c03 commit 841e1e7

File tree

6 files changed

+130
-37
lines changed

6 files changed

+130
-37
lines changed

sycl/source/detail/scheduler/scheduler.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,19 @@ void Scheduler::enqueueLeavesOfReqUnlocked(const Requirement *const Req) {
238238
EnqueueLeaves(Record->MWriteLeaves);
239239
}
240240

241+
void Scheduler::allocateStreamBuffers(stream_impl *Impl,
242+
size_t StreamBufferSize,
243+
size_t FlushBufferSize) {
244+
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
245+
StreamBuffersPool.insert(
246+
{Impl, StreamBuffers(StreamBufferSize, FlushBufferSize)});
247+
}
248+
249+
void Scheduler::deallocateStreamBuffers(stream_impl *Impl) {
250+
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
251+
StreamBuffersPool.erase(Impl);
252+
}
253+
241254
Scheduler::Scheduler() {
242255
sycl::device HostDevice;
243256
DefaultHostQueue = QueueImplPtr(

sycl/source/detail/scheduler/scheduler.hpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,43 @@ class Scheduler {
715715

716716
friend class Command;
717717
friend class DispatchHostTask;
718+
719+
/// Stream buffers structure.
720+
///
721+
/// The structure contains all buffers for a stream object.
722+
struct StreamBuffers {
723+
StreamBuffers(size_t StreamBufferSize, size_t FlushBufferSize)
724+
// Initialize stream buffer with zeros, this is needed for two reasons:
725+
// 1. We don't need to care about end of line when printing out
726+
// streamed data.
727+
// 2. Offset is properly initialized.
728+
: Data(StreamBufferSize, 0),
729+
Buf(Data.data(), range<1>(StreamBufferSize),
730+
{property::buffer::use_host_ptr()}),
731+
FlushBuf(range<1>(FlushBufferSize)) {}
732+
733+
// Vector on the host side which is used to initialize the stream
734+
// buffer
735+
std::vector<char> Data;
736+
737+
// Stream buffer
738+
buffer<char, 1> Buf;
739+
740+
// Global flush buffer
741+
buffer<char, 1> FlushBuf;
742+
};
743+
744+
friend class stream_impl;
745+
746+
// Protects stream buffers pool
747+
std::mutex StreamBuffersPoolMutex;
748+
std::map<stream_impl *, StreamBuffers> StreamBuffersPool;
749+
750+
/// Allocate buffers in the pool for a provided stream
751+
void allocateStreamBuffers(stream_impl *, size_t, size_t);
752+
753+
/// Deallocate buffers in the pool for a provided stream
754+
void deallocateStreamBuffers(stream_impl *);
718755
};
719756

720757
} // namespace detail

sycl/source/detail/stream_impl.cpp

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,63 @@ namespace detail {
1717

1818
stream_impl::stream_impl(size_t BufferSize, size_t MaxStatementSize,
1919
handler &CGH)
20-
: BufferSize_(BufferSize), MaxStatementSize_(MaxStatementSize),
21-
// Allocate additional place for the offset variable and the end of line
22-
// symbol. Initialize buffer with zeros, this is needed for two reasons:
23-
// 1. We don't need to care about end of line when printing out streamed
24-
// data.
25-
// 2. Offset is properly initialized.
26-
Data(BufferSize + OffsetSize + 1, 0),
27-
Buf(Data.data(), range<1>(BufferSize + OffsetSize + 1),
28-
{property::buffer::use_host_ptr()}),
29-
30-
FlushBuf(range<1>(MaxStatementSize)) {}
20+
: BufferSize_(BufferSize), MaxStatementSize_(MaxStatementSize) {
21+
// We need to store stream buffers in the scheduler because they need to be
22+
// alive after submitting the kernel. They cannot be stored in the stream
23+
// object because it causes loop dependency between objects and results in
24+
// memory leak.
25+
// Allocate additional place in the stream buffer for the offset variable and
26+
// the end of line symbol.
27+
detail::Scheduler::getInstance().allocateStreamBuffers(
28+
this, BufferSize + OffsetSize + 1 /* size of the stream buffer */,
29+
MaxStatementSize /* size of the flush buffer */);
30+
}
31+
32+
// Method to provide an access to the global stream buffer
33+
GlobalBufAccessorT stream_impl::accessGlobalBuf(handler &CGH) {
34+
return detail::Scheduler::getInstance()
35+
.StreamBuffersPool.find(this)
36+
->second.Buf.get_access<cl::sycl::access::mode::read_write>(
37+
CGH, range<1>(BufferSize_), id<1>(OffsetSize));
38+
}
3139

40+
// Method to provide an accessor to the global flush buffer
41+
GlobalBufAccessorT stream_impl::accessGlobalFlushBuf(handler &CGH) {
42+
return detail::Scheduler::getInstance()
43+
.StreamBuffersPool.find(this)
44+
->second.FlushBuf.get_access<cl::sycl::access::mode::read_write>(
45+
CGH, range<1>(MaxStatementSize_), id<1>(0));
46+
}
47+
48+
// Method to provide an atomic access to the offset in the global stream
49+
// buffer and offset in the flush buffer
50+
GlobalOffsetAccessorT stream_impl::accessGlobalOffset(handler &CGH) {
51+
auto OffsetSubBuf = buffer<char, 1>(
52+
detail::Scheduler::getInstance().StreamBuffersPool.find(this)->second.Buf,
53+
id<1>(0), range<1>(OffsetSize));
54+
auto ReinterpretedBuf = OffsetSubBuf.reinterpret<unsigned, 1>(range<1>(2));
55+
return ReinterpretedBuf.get_access<cl::sycl::access::mode::atomic>(
56+
CGH, range<1>(2), id<1>(0));
57+
}
3258
size_t stream_impl::get_size() const { return BufferSize_; }
3359

3460
size_t stream_impl::get_max_statement_size() const { return MaxStatementSize_; }
3561

3662
void stream_impl::flush() {
3763
// Access the stream buffer on the host. This access guarantees that kernel is
3864
// executed and buffer contains streamed data.
39-
auto HostAcc = Buf.get_access<cl::sycl::access::mode::read>(
40-
range<1>(BufferSize_), id<1>(OffsetSize));
65+
{
66+
auto HostAcc = detail::Scheduler::getInstance()
67+
.StreamBuffersPool.find(this)
68+
->second.Buf.get_access<cl::sycl::access::mode::read>(
69+
range<1>(BufferSize_), id<1>(OffsetSize));
70+
71+
printf("%s", HostAcc.get_pointer());
72+
fflush(stdout);
73+
}
4174

42-
printf("%s", HostAcc.get_pointer());
43-
fflush(stdout);
75+
// Flushed the stream, can deallocate the buffers now.
76+
detail::Scheduler::getInstance().deallocateStreamBuffers(this);
4477
}
4578
} // namespace detail
4679
} // namespace sycl

sycl/source/detail/stream_impl.hpp

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,14 @@ class __SYCL_EXPORT stream_impl {
2626
stream_impl(size_t BufferSize, size_t MaxStatementSize, handler &CGH);
2727

2828
// Method to provide an access to the global stream buffer
29-
GlobalBufAccessorT accessGlobalBuf(handler &CGH) {
30-
return Buf.get_access<cl::sycl::access::mode::read_write>(
31-
CGH, range<1>(BufferSize_), id<1>(OffsetSize));
32-
}
29+
GlobalBufAccessorT accessGlobalBuf(handler &CGH);
3330

3431
// Method to provide an accessor to the global flush buffer
35-
GlobalBufAccessorT accessGlobalFlushBuf(handler &CGH) {
36-
return FlushBuf.get_access<cl::sycl::access::mode::read_write>(
37-
CGH, range<1>(MaxStatementSize_), id<1>(0));
38-
}
32+
GlobalBufAccessorT accessGlobalFlushBuf(handler &CGH);
3933

4034
// Method to provide an atomic access to the offset in the global stream
4135
// buffer and offset in the flush buffer
42-
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH) {
43-
auto OffsetSubBuf = buffer<char, 1>(Buf, id<1>(0), range<1>(OffsetSize));
44-
auto ReinterpretedBuf = OffsetSubBuf.reinterpret<unsigned, 1>(range<1>(2));
45-
return ReinterpretedBuf.get_access<cl::sycl::access::mode::atomic>(
46-
CGH, range<1>(2), id<1>(0));
47-
}
36+
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH);
4837

4938
// Copy stream buffer to the host and print the contents
5039
void flush();
@@ -65,14 +54,6 @@ class __SYCL_EXPORT stream_impl {
6554
// 2 variables: offset in the stream buffer and offset in the flush buffer.
6655
static const size_t OffsetSize = 2 * sizeof(unsigned);
6756

68-
// Vector on the host side which is used to initialize the stream buffer
69-
std::vector<char> Data;
70-
71-
// Stream buffer
72-
buffer<char, 1> Buf;
73-
74-
// Global flush buffer
75-
buffer<char, 1> FlushBuf;
7657
};
7758

7859
} // namespace detail

sycl/test/abi/sycl_symbols_linux.dump

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4161,3 +4161,6 @@ _ZNK2cl4sycl9exception11has_contextEv
41614161
_ZNK2cl4sycl9exception4whatEv
41624162
__sycl_register_lib
41634163
__sycl_unregister_lib
4164+
_ZN2cl4sycl6detail11stream_impl15accessGlobalBufERNS0_7handlerE
4165+
_ZN2cl4sycl6detail11stream_impl20accessGlobalFlushBufERNS0_7handlerE
4166+
_ZN2cl4sycl6detail11stream_impl18accessGlobalOffsetERNS0_7handlerE
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
2+
// RUN: env SYCL_PI_TRACE=2 %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
3+
// RUN: env SYCL_PI_TRACE=2 %GPU_RUN_PLACEHOLDER %t.out %GPU_CHECK_PLACEHOLDER
4+
// RUN: env SYCL_PI_TRACE=2 %ACC_RUN_PLACEHOLDER %t.out %ACC_CHECK_PLACEHOLDER
5+
6+
// Check that buffer used by a stream object is released.
7+
8+
#include <CL/sycl.hpp>
9+
10+
using namespace cl::sycl;
11+
12+
int main() {
13+
{
14+
queue Queue;
15+
16+
// CHECK:---> piMemRelease
17+
Queue.submit([&](handler &CGH) {
18+
stream Out(1024, 80, CGH);
19+
CGH.parallel_for<class test_cleanup1>(
20+
range<1>(2), [=](id<1> i) { Out << "Hello, World!" << endl; });
21+
});
22+
Queue.wait();
23+
}
24+
25+
return 0;
26+
}

0 commit comments

Comments
 (0)