Skip to content

Commit 98d25fe

Browse files
committed
Add EchoTestRpc
1 parent 1878efa commit 98d25fe

21 files changed

+11735
-11210
lines changed

eventstreamrpc/include/aws/eventstreamrpc/EventStreamClient.h

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ namespace Aws
4747

4848
/**
4949
* A callback prototype that is called upon flushing a message over the wire.
50-
* @param errorCode A non-zero value if an error occured while attempting to flush the message.
50+
* @param errorCode A non-zero value if an error occured while attempting to flush the message.
5151
*/
5252
using OnMessageFlushCallback = std::function<void(int errorCode)>;
5353

@@ -56,7 +56,7 @@ namespace Aws
5656
* packet sent out by the client.
5757
* @return The `MessageAmendment` for the client to use during an attempt to connect.
5858
*/
59-
using ConnectMessageAmender = std::function<MessageAmendment &(void)>;
59+
using ConnectMessageAmender = std::function<const MessageAmendment &(void)>;
6060

6161
/**
6262
* A wrapper around an `aws_event_stream_header_value_pair` object.
@@ -144,25 +144,65 @@ namespace Aws
144144
};
145145

146146
/**
147-
* Configuration structure holding all options relating to eventstream RPC connection establishment
147+
* Configuration structure holding all configurations relating to eventstream RPC connection establishment
148148
*/
149-
struct AWS_EVENTSTREAMRPC_API ClientConnectionOptions final
149+
class ConnectionConfig
150150
{
151-
ClientConnectionOptions();
152-
ClientConnectionOptions(const ClientConnectionOptions &rhs) = default;
153-
ClientConnectionOptions(ClientConnectionOptions &&rhs) = default;
154-
155-
~ClientConnectionOptions() = default;
151+
public:
152+
ConnectionConfig() noexcept : m_clientBootstrap(nullptr), m_connectRequestCallback(nullptr) {}
153+
Crt::Optional<Crt::String> GetHostName() const noexcept { return m_hostName; }
154+
Crt::Optional<uint16_t> GetPort() const noexcept { return m_port; }
155+
Crt::Optional<Crt::Io::SocketDomain> GetSocketDomain() const noexcept { return m_socketDomain; }
156+
Crt::Optional<Crt::Io::SocketType> GetSocketType() const noexcept { return m_socketType; }
157+
Crt::Optional<MessageAmendment> GetConnectAmendment() const noexcept { return m_connectAmendment; }
158+
Crt::Optional<Crt::Io::TlsConnectionOptions> GetTlsConnectionOptions() const noexcept
159+
{
160+
return m_tlsConnectionOptions;
161+
}
162+
Crt::Io::ClientBootstrap *GetClientBootstrap() const noexcept { return m_clientBootstrap; }
163+
OnMessageFlushCallback GetConnectRequestCallback() const noexcept { return m_connectRequestCallback; }
164+
ConnectMessageAmender GetConnectMessageAmender() const noexcept
165+
{
166+
if (m_connectAmendment.has_value())
167+
{
168+
return [&](void) -> const MessageAmendment & { return m_connectAmendment.value(); };
169+
}
170+
else
171+
{
172+
return nullptr;
173+
}
174+
}
156175

157-
ClientConnectionOptions &operator=(const ClientConnectionOptions &rhs) = default;
158-
ClientConnectionOptions &operator=(ClientConnectionOptions &&rhs) = default;
176+
void SetHostName(Crt::String hostName) noexcept { m_hostName = hostName; }
177+
void SetPort(uint16_t port) noexcept { m_port = port; }
178+
void SetSocketDomain(Crt::Io::SocketDomain socketDomain) noexcept { m_socketDomain = socketDomain; }
179+
void SetSocketType(Crt::Io::SocketType socketType) noexcept { m_socketType = socketType; }
180+
void SetConnectAmendment(MessageAmendment connectAmendment) noexcept
181+
{
182+
m_connectAmendment = connectAmendment;
183+
}
184+
void SetTlsConnectionOptions(Crt::Io::TlsConnectionOptions tlsConnectionOptions) noexcept
185+
{
186+
m_tlsConnectionOptions = tlsConnectionOptions;
187+
}
188+
void SetClientBootstrap(Crt::Io::ClientBootstrap *clientBootstrap) noexcept
189+
{
190+
m_clientBootstrap = clientBootstrap;
191+
}
192+
void SetConnectRequestCallback(OnMessageFlushCallback connectRequestCallback) noexcept
193+
{
194+
m_connectRequestCallback = connectRequestCallback;
195+
}
159196

160-
Crt::Io::ClientBootstrap *Bootstrap;
161-
Crt::Io::SocketOptions SocketOptions;
162-
Crt::Optional<Crt::Io::TlsConnectionOptions> TlsOptions;
163-
Crt::String HostName;
164-
uint16_t Port;
165-
OnMessageFlushCallback ConnectRequestCallback;
197+
protected:
198+
Crt::Optional<Crt::String> m_hostName;
199+
Crt::Optional<uint16_t> m_port;
200+
Crt::Optional<Crt::Io::SocketDomain> m_socketDomain;
201+
Crt::Optional<Crt::Io::SocketType> m_socketType;
202+
Crt::Optional<Crt::Io::TlsConnectionOptions> m_tlsConnectionOptions;
203+
Crt::Io::ClientBootstrap *m_clientBootstrap;
204+
Crt::Optional<MessageAmendment> m_connectAmendment;
205+
OnMessageFlushCallback m_connectRequestCallback;
166206
};
167207

168208
class AWS_EVENTSTREAMRPC_API ConnectionLifecycleHandler
@@ -216,7 +256,7 @@ namespace Aws
216256
virtual void OnContinuationClosed() = 0;
217257
};
218258

219-
enum EventStreamRpcError
259+
enum EventStreamRpcStatusCode
220260
{
221261
EVENT_STREAM_RPC_SUCCESS = 0,
222262
EVENT_STREAM_RPC_NULL_PARAMETER,
@@ -231,7 +271,7 @@ namespace Aws
231271

232272
struct RpcError
233273
{
234-
EventStreamRpcError baseStatus;
274+
EventStreamRpcStatusCode baseStatus;
235275
int crtError;
236276
operator bool() const noexcept { return baseStatus == EVENT_STREAM_RPC_SUCCESS; }
237277
};
@@ -267,7 +307,7 @@ namespace Aws
267307
ClientConnection &operator=(ClientConnection &&) noexcept;
268308

269309
std::future<RpcError> Connect(
270-
const ClientConnectionOptions &connectionOptions,
310+
const ConnectionConfig &connectionOptions,
271311
ConnectionLifecycleHandler *connectionLifecycleHandler,
272312
ConnectMessageAmender connectMessageAmender) noexcept;
273313

@@ -569,7 +609,6 @@ namespace Aws
569609
ClientOperation(ClientOperation &&clientOperation) noexcept;
570610
std::future<RpcError> Close(OnMessageFlushCallback onMessageFlushCallback = nullptr) noexcept;
571611
std::future<TaggedResult> GetOperationResult() noexcept;
572-
// virtual bool IsStreaming() = 0;
573612

574613
protected:
575614
std::future<RpcError> Activate(
@@ -582,8 +621,8 @@ namespace Aws
582621
const OperationModelContext &m_operationModelContext;
583622

584623
private:
585-
EventStreamRpcError HandleData(const Crt::String &modelName, const Crt::Optional<Crt::ByteBuf> &payload);
586-
EventStreamRpcError HandleError(
624+
EventStreamRpcStatusCode HandleData(const Crt::String &modelName, const Crt::Optional<Crt::ByteBuf> &payload);
625+
EventStreamRpcStatusCode HandleError(
587626
const Crt::String &modelName,
588627
const Crt::Optional<Crt::ByteBuf> &payload,
589628
uint16_t messageFlags);

eventstreamrpc/source/EventStreamClient.cpp

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,6 @@ namespace Aws
8181
}
8282
}
8383

84-
ClientConnectionOptions::ClientConnectionOptions()
85-
: Bootstrap(nullptr), SocketOptions(), TlsOptions(), HostName(), Port(0), ConnectRequestCallback(nullptr)
86-
{
87-
}
88-
8984
template <typename T>
9085
ProtectedPromise<T>::ProtectedPromise(std::promise<T> &&promise)
9186
: m_fulfilled(false), m_promise(std::move(promise))
@@ -239,32 +234,74 @@ namespace Aws
239234
void ConnectionLifecycleHandler::OnDisconnectCallback(int errorCode) { (void)errorCode; }
240235

241236
std::future<RpcError> ClientConnection::Connect(
242-
const ClientConnectionOptions &connectionOptions,
237+
const ConnectionConfig &connectionConfig,
243238
ConnectionLifecycleHandler *connectionLifecycleHandler,
244239
ConnectMessageAmender connectMessageAmender) noexcept
245240
{
246241
m_connectAckedPromise.Reset();
247242
m_closedPromise = {};
248243
m_lifecycleHandler = connectionLifecycleHandler;
244+
EventStreamRpcStatusCode baseError = EVENT_STREAM_RPC_SUCCESS;
249245

250-
m_onConnectRequestCallback = connectionOptions.ConnectRequestCallback;
246+
m_onConnectRequestCallback = connectionConfig.GetConnectRequestCallback();
251247

252248
struct aws_event_stream_rpc_client_connection_options connOptions;
253249
AWS_ZERO_STRUCT(connOptions);
254-
connOptions.host_name = connectionOptions.HostName.c_str();
255-
connOptions.port = connectionOptions.Port;
256-
connOptions.socket_options = &connectionOptions.SocketOptions.GetImpl();
257-
connOptions.bootstrap = connectionOptions.Bootstrap->GetUnderlyingHandle();
250+
if(connectionConfig.GetHostName().has_value())
251+
{
252+
connOptions.host_name = connectionConfig.GetHostName().value().c_str();
253+
}
254+
else
255+
{
256+
baseError = EVENT_STREAM_RPC_NULL_PARAMETER;
257+
}
258+
if(connectionConfig.GetPort().has_value())
259+
{
260+
connOptions.port = connectionConfig.GetPort().value();
261+
}
262+
else
263+
{
264+
baseError = EVENT_STREAM_RPC_NULL_PARAMETER;
265+
}
266+
267+
if(connectionConfig.GetClientBootstrap() != nullptr)
268+
{
269+
connOptions.bootstrap = connectionConfig.GetClientBootstrap()->GetUnderlyingHandle();
270+
}
271+
else
272+
{
273+
baseError = EVENT_STREAM_RPC_NULL_PARAMETER;
274+
}
275+
276+
if (baseError)
277+
{
278+
m_connectAckedPromise.SetValue({baseError, 0});
279+
return m_connectAckedPromise.GetFuture();
280+
}
281+
282+
Crt::Io::SocketOptions socketOptions;
283+
if (connectionConfig.GetSocketDomain().has_value())
284+
{
285+
socketOptions.SetSocketDomain(connectionConfig.GetSocketDomain().value());
286+
}
287+
if (connectionConfig.GetSocketType().has_value())
288+
{
289+
socketOptions.SetSocketType(connectionConfig.GetSocketType().value());
290+
}
291+
connOptions.socket_options = &socketOptions.GetImpl();
292+
258293
connOptions.on_connection_setup = ClientConnection::s_onConnectionSetup;
259294
connOptions.on_connection_protocol_message = ClientConnection::s_onProtocolMessage;
260295
connOptions.on_connection_shutdown = ClientConnection::s_onConnectionShutdown;
261296
connOptions.user_data = reinterpret_cast<void *>(this);
262297
m_lifecycleHandler = connectionLifecycleHandler;
263-
m_connectMessageAmender = connectMessageAmender;
298+
if(connectMessageAmender) {
299+
m_connectMessageAmender = connectMessageAmender;
300+
}
264301

265-
if (connectionOptions.TlsOptions.has_value())
302+
if (connectionConfig.GetTlsConnectionOptions().has_value())
266303
{
267-
connOptions.tls_options = connectionOptions.TlsOptions->GetUnderlyingHandle();
304+
connOptions.tls_options = connectionConfig.GetTlsConnectionOptions()->GetUnderlyingHandle();
268305
}
269306

270307
int crtError = aws_event_stream_rpc_client_connection_connect(m_allocator, &connOptions);
@@ -515,7 +552,7 @@ namespace Aws
515552

516553
if (thisConnection->m_connectMessageAmender)
517554
{
518-
MessageAmendment &connectAmendment = thisConnection->m_connectMessageAmender();
555+
MessageAmendment connectAmendment(thisConnection->m_connectMessageAmender());
519556
auto &amenderHeaderList = connectAmendment.GetHeaders();
520557
/* The version header is necessary for establishing the connection. */
521558
messageAmendment.AddHeader(EventStreamHeader(
@@ -1015,7 +1052,7 @@ namespace Aws
10151052
return nullptr;
10161053
}
10171054

1018-
EventStreamRpcError ClientOperation::HandleData(
1055+
EventStreamRpcStatusCode ClientOperation::HandleData(
10191056
const Crt::String &modelName,
10201057
const Crt::Optional<Crt::ByteBuf> &payload)
10211058
{
@@ -1056,7 +1093,7 @@ namespace Aws
10561093
return EVENT_STREAM_RPC_SUCCESS;
10571094
}
10581095

1059-
EventStreamRpcError ClientOperation::HandleError(
1096+
EventStreamRpcStatusCode ClientOperation::HandleError(
10601097
const Crt::String &modelName,
10611098
const Crt::Optional<Crt::ByteBuf> &payload,
10621099
uint16_t messageFlags)
@@ -1120,7 +1157,7 @@ namespace Aws
11201157
MessageType messageType,
11211158
uint32_t messageFlags)
11221159
{
1123-
EventStreamRpcError errorCode = EVENT_STREAM_RPC_SUCCESS;
1160+
EventStreamRpcStatusCode errorCode = EVENT_STREAM_RPC_SUCCESS;
11241161
const EventStreamHeader *modelHeader = nullptr;
11251162
const EventStreamHeader *contentHeader = nullptr;
11261163

@@ -1172,7 +1209,7 @@ namespace Aws
11721209
{
11731210
if (m_messageCount == 1)
11741211
{
1175-
RpcError promiseValue = {(EventStreamRpcError)errorCode, 0};
1212+
RpcError promiseValue = {(EventStreamRpcStatusCode)errorCode, 0};
11761213
m_initialResponsePromise.SetValue(TaggedResult(promiseValue));
11771214
}
11781215
else

eventstreamrpc/tests/EventStreamClientTest.cpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ static int s_TestEventStreamConnect(struct aws_allocator *allocator, void *ctx)
6464
ASSERT_TRUE(tlsContext);
6565

6666
Aws::Crt::Io::TlsConnectionOptions tlsConnectionOptions = tlsContext.NewConnectionOptions();
67-
Aws::Crt::Io::SocketOptions socketOptions;
68-
socketOptions.SetConnectTimeoutMs(1000);
6967

7068
Aws::Crt::Io::EventLoopGroup eventLoopGroup(0, allocator);
7169
ASSERT_TRUE(eventLoopGroup);
@@ -79,19 +77,18 @@ static int s_TestEventStreamConnect(struct aws_allocator *allocator, void *ctx)
7977
MessageAmendment connectionAmendment;
8078
auto messageAmender = [&](void) -> MessageAmendment & { return connectionAmendment; };
8179

82-
ClientConnectionOptions options;
83-
options.Bootstrap = &clientBootstrap;
84-
options.SocketOptions = socketOptions;
85-
options.HostName = Aws::Crt::String("127.0.0.1");
86-
options.Port = 8033;
80+
ConnectionConfig config;
81+
config.SetClientBootstrap(&clientBootstrap);
82+
config.SetHostName(Aws::Crt::String("127.0.0.1"));
83+
config.SetPort(8033U);
8784

8885
/* Happy path case. */
8986
{
9087
TestLifecycleHandler lifecycleHandler;
9188
ClientConnection connection(allocator);
9289
connectionAmendment.AddHeader(EventStreamHeader(
9390
Aws::Crt::String("client-name"), Aws::Crt::String("accepted.testy_mc_testerson"), allocator));
94-
auto future = connection.Connect(options, &lifecycleHandler, messageAmender);
91+
auto future = connection.Connect(config, &lifecycleHandler, messageAmender);
9592
ASSERT_TRUE(future.get().baseStatus == EVENT_STREAM_RPC_SUCCESS);
9693
lifecycleHandler.WaitOnCondition([&]() { return lifecycleHandler.isConnected; });
9794
/* Test all protocol messages. */
@@ -110,7 +107,7 @@ static int s_TestEventStreamConnect(struct aws_allocator *allocator, void *ctx)
110107
{
111108
TestLifecycleHandler lifecycleHandler;
112109
ClientConnection connection(allocator);
113-
auto future = connection.Connect(options, &lifecycleHandler, messageAmender);
110+
auto future = connection.Connect(config, &lifecycleHandler, messageAmender);
114111
ASSERT_TRUE(future.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_CLOSED_BEFORE_CONNACK);
115112
lifecycleHandler.WaitOnCondition([&]() { return lifecycleHandler.lastErrorCode == AWS_OP_SUCCESS; });
116113
}
@@ -121,7 +118,7 @@ static int s_TestEventStreamConnect(struct aws_allocator *allocator, void *ctx)
121118
ClientConnection connection(allocator);
122119
connectionAmendment.AddHeader(EventStreamHeader(
123120
Aws::Crt::String("client-name"), Aws::Crt::String("rejected.testy_mc_testerson"), allocator));
124-
auto future = connection.Connect(options, &lifecycleHandler, messageAmender);
121+
auto future = connection.Connect(config, &lifecycleHandler, messageAmender);
125122
ASSERT_TRUE(future.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_CLOSED_BEFORE_CONNACK);
126123
lifecycleHandler.WaitOnCondition([&]() { return lifecycleHandler.lastErrorCode == AWS_OP_SUCCESS; });
127124
}

ipc/CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ if (NOT CMAKE_CXX_STANDARD)
2020
endif()
2121

2222
file(GLOB AWS_GREENGRASSIPC_HEADERS
23-
"include/aws/ipc/*.h"
23+
"include/aws/greengrass/*.h"
2424
)
2525

2626
file(GLOB AWS_GREENGRASSIPC_SRC
@@ -33,7 +33,7 @@ file(GLOB AWS_GREENGRASSIPC_CPP_SRC
3333

3434
if (WIN32)
3535
if (MSVC)
36-
source_group("Header Files\\aws\\ipc\\" FILES ${AWS_GREENGRASSIPC_HEADERS})
36+
source_group("Header Files\\aws\\greengrass\\" FILES ${AWS_GREENGRASSIPC_HEADERS})
3737

3838
source_group("Source Files" FILES ${AWS_GREENGRASSIPC_SRC})
3939
endif ()
@@ -98,7 +98,7 @@ endif()
9898

9999
target_link_libraries(GreengrassIpc-cpp ${DEP_AWS_LIBS})
100100

101-
install(FILES ${AWS_GREENGRASSIPC_HEADERS} DESTINATION "include/aws/ipc/" COMPONENT Development)
101+
install(FILES ${AWS_GREENGRASSIPC_HEADERS} DESTINATION "include/aws/greengrass/" COMPONENT Development)
102102

103103
if (BUILD_SHARED_LIBS)
104104
set(TARGET_DIR "shared")
File renamed without changes.

0 commit comments

Comments
 (0)