Skip to content

Commit 4510a1f

Browse files
Implemented a C++ based stream that will interoperate with the low-le… (aws#89)
* Implemented a C++ based stream that will interoperate with the low-level aws_input_stream apis.
1 parent 0d84b42 commit 4510a1f

File tree

7 files changed

+278
-177
lines changed

7 files changed

+278
-177
lines changed

include/aws/crt/http/HttpRequestResponse.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <aws/crt/Exports.h>
1818
#include <aws/crt/Types.h>
19+
#include <aws/crt/io/Stream.h>
1920

2021
struct aws_http_header;
2122
struct aws_http_message;
@@ -49,13 +50,15 @@ namespace Aws
4950
/**
5051
* Gets the input stream representing the message body
5152
*/
52-
std::shared_ptr<Aws::Crt::Io::IStream> GetBody() const noexcept;
53+
std::shared_ptr<Aws::Crt::Io::InputStream> GetBody() const noexcept;
5354

5455
/**
5556
* Sets the input stream representing the message body
5657
*/
5758
bool SetBody(const std::shared_ptr<Aws::Crt::Io::IStream> &body) noexcept;
5859

60+
bool SetBody(const std::shared_ptr<Aws::Crt::Io::InputStream> &body) noexcept;
61+
5962
size_t GetHeaderCount() const noexcept;
6063
Optional<HttpHeader> GetHeader(size_t index) const noexcept;
6164
bool AddHeader(const HttpHeader &header) noexcept;
@@ -70,7 +73,7 @@ namespace Aws
7073

7174
Allocator *m_allocator;
7275
struct aws_http_message *m_message;
73-
std::shared_ptr<Aws::Crt::Io::IStream> m_bodyStream;
76+
std::shared_ptr<Aws::Crt::Io::InputStream> m_bodyStream;
7477
bool m_ownsMessage;
7578
};
7679

include/aws/crt/io/Stream.h

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,115 @@
1616

1717
#include <aws/crt/Exports.h>
1818
#include <aws/crt/Types.h>
19-
20-
struct aws_input_stream;
19+
#include <aws/io/stream.h>
2120

2221
namespace Aws
2322
{
2423
namespace Crt
2524
{
2625
namespace Io
2726
{
28-
/**
29-
* Factory to create a aws-c-io input stream subclass from a C++ stream
27+
using StreamStatus = aws_stream_status;
28+
using OffsetType = aws_off_t;
29+
30+
enum class StreamSeekBasis
31+
{
32+
Begin = AWS_SSB_BEGIN,
33+
End = AWS_SSB_END,
34+
};
35+
36+
/***
37+
* Interface for building an Object oriented stream that will be honored by the CRT's low-level
38+
* aws_input_stream interface. To use, create a subclass of InputStream and define the abstract
39+
* functions.
40+
*/
41+
class AWS_CRT_CPP_API InputStream
42+
{
43+
public:
44+
virtual ~InputStream();
45+
46+
InputStream(const InputStream &) = delete;
47+
InputStream &operator=(const InputStream &) = delete;
48+
InputStream(InputStream &&) = delete;
49+
InputStream &operator=(InputStream &&) = delete;
50+
51+
explicit operator bool() const noexcept { return IsGood(); }
52+
53+
virtual bool IsGood() const noexcept = 0;
54+
55+
aws_input_stream *GetUnderlyingStream() noexcept { return &m_underlying_stream; }
56+
57+
protected:
58+
Allocator *m_allocator;
59+
aws_input_stream m_underlying_stream;
60+
61+
InputStream(Aws::Crt::Allocator *allocator = DefaultAllocator());
62+
63+
/***
64+
* Read up-to buffer::capacity - buffer::len into buffer::buffer
65+
* Increment buffer::len by the amount you read in.
66+
*
67+
* @return true on success, false otherwise. Return false, when there is nothing left to read.
68+
* You SHOULD raise an error via aws_raise_error()
69+
* if an actual failure condition occurs.
70+
* If you return false, GetStatusImpl() will be called to determine
71+
* the validity of the stream.
72+
*/
73+
virtual bool ReadImpl(ByteBuf &buffer) noexcept = 0;
74+
75+
/**
76+
* Returns the current status of the stream.
77+
*/
78+
virtual StreamStatus GetStatusImpl() const noexcept = 0;
79+
80+
/**
81+
* Returns the total length of the available data for the stream.
82+
* Returns -1 if not available.
83+
*/
84+
virtual int64_t GetLengthImpl() const noexcept = 0;
85+
86+
/**
87+
* Seek's the stream to seekBasis based offset bytes.
88+
*
89+
* It is expected, that if seeking to the beginning of a stream,
90+
* all error's are cleared if possible.
91+
*
92+
* @return true on success, false otherwise. You SHOULD raise an error via aws_raise_error()
93+
* if a failure occurs. If you return false, the m_good flag will be set to false.
94+
*/
95+
virtual bool SeekImpl(OffsetType offset, StreamSeekBasis seekBasis) noexcept = 0;
96+
97+
private:
98+
static int s_Seek(aws_input_stream *stream, aws_off_t offset, enum aws_stream_seek_basis basis);
99+
static int s_Read(aws_input_stream *stream, aws_byte_buf *dest);
100+
static int s_GetStatus(aws_input_stream *stream, aws_stream_status *status);
101+
static int s_GetLength(struct aws_input_stream *stream, int64_t *out_length);
102+
static void s_Destroy(struct aws_input_stream *stream);
103+
104+
static aws_input_stream_vtable s_vtable;
105+
};
106+
107+
/***
108+
* Implementation of Aws::Crt::Io::InputStream that wraps a std::input_stream.
30109
*/
31-
AWS_CRT_CPP_API aws_input_stream *AwsInputStreamNewCpp(
32-
const std::shared_ptr<Aws::Crt::Io::IStream> &stream,
33-
Aws::Crt::Allocator *allocator = DefaultAllocator()) noexcept;
110+
class AWS_CRT_CPP_API StdIOStreamInputStream : public InputStream
111+
{
112+
public:
113+
StdIOStreamInputStream(
114+
std::shared_ptr<Aws::Crt::Io::IStream> stream,
115+
Aws::Crt::Allocator *allocator = DefaultAllocator()) noexcept;
116+
117+
bool IsGood() const noexcept override;
118+
119+
protected:
120+
bool ReadImpl(ByteBuf &buffer) noexcept override;
121+
StreamStatus GetStatusImpl() const noexcept override;
122+
int64_t GetLengthImpl() const noexcept override;
123+
bool SeekImpl(OffsetType offsetType, StreamSeekBasis seekBasis) noexcept override;
124+
125+
private:
126+
std::shared_ptr<Aws::Crt::Io::IStream> m_stream;
127+
};
34128
} // namespace Io
35129
} // namespace Crt
36130
} // namespace Aws

source/http/HttpRequestResponse.cpp

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,32 +49,31 @@ namespace Aws
4949
}
5050
}
5151

52-
std::shared_ptr<Aws::Crt::Io::IStream> HttpMessage::GetBody() const noexcept { return m_bodyStream; }
52+
std::shared_ptr<Aws::Crt::Io::InputStream> HttpMessage::GetBody() const noexcept { return m_bodyStream; }
5353

5454
bool HttpMessage::SetBody(const std::shared_ptr<Aws::Crt::Io::IStream> &body) noexcept
5555
{
56-
aws_input_stream *stream = nullptr;
56+
aws_http_message_set_body_stream(m_message, nullptr);
57+
m_bodyStream = nullptr;
58+
5759
if (body != nullptr)
5860
{
59-
stream = Aws::Crt::Io::AwsInputStreamNewCpp(body, m_allocator);
60-
if (stream == nullptr)
61+
m_bodyStream = MakeShared<Io::StdIOStreamInputStream>(m_allocator, body, m_allocator);
62+
if (m_bodyStream == nullptr || !m_bodyStream)
6163
{
6264
return false;
6365
}
66+
aws_http_message_set_body_stream(m_message, m_bodyStream->GetUnderlyingStream());
6467
}
6568

66-
/*
67-
* clean up the old stream before setting the new
68-
*/
69-
aws_input_stream *old_stream = aws_http_message_get_body_stream(m_message);
70-
if (old_stream != nullptr)
71-
{
72-
aws_input_stream_destroy(old_stream);
73-
}
74-
75-
aws_http_message_set_body_stream(m_message, stream);
69+
return true;
70+
}
7671

77-
m_bodyStream = (stream) ? body : nullptr;
72+
bool HttpMessage::SetBody(const std::shared_ptr<Aws::Crt::Io::InputStream> &body) noexcept
73+
{
74+
m_bodyStream = body;
75+
aws_http_message_set_body_stream(
76+
m_message, m_bodyStream && *m_bodyStream ? m_bodyStream->GetUnderlyingStream() : nullptr);
7877

7978
return true;
8079
}

0 commit comments

Comments
 (0)