Skip to content

Commit f551a25

Browse files
committed
Fix connection close errors
1 parent 462c9d0 commit f551a25

File tree

6 files changed

+50
-43
lines changed

6 files changed

+50
-43
lines changed

eventstreamrpc/include/aws/eventstreamrpc/EventStreamClient.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,12 @@ namespace Aws
251251
EVENT_STREAM_RPC_SUCCESS = 0,
252252
EVENT_STREAM_RPC_NULL_PARAMETER,
253253
EVENT_STREAM_RPC_INITIALIZATION_ERROR,
254-
EVENT_STREAM_RPC_CONNECTION_CLOSED_BEFORE_CONNACK,
254+
EVENT_STREAM_RPC_CONNECTION_CLOSED,
255+
EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED,
256+
EVENT_STREAM_RPC_CONTINUATION_CLOSED,
255257
EVENT_STREAM_RPC_UNKNOWN_PROTOCOL_MESSAGE,
256258
EVENT_STREAM_RPC_UNMAPPED_DATA,
257259
EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE,
258-
EVENT_STREAM_RPC_STREAM_CLOSED_PREMATURELY,
259260
EVENT_STREAM_RPC_CRT_ERROR
260261
};
261262

@@ -275,8 +276,8 @@ namespace Aws
275276
ProtectedPromise(ProtectedPromise &&rhs);
276277
ProtectedPromise &operator=(ProtectedPromise &&);
277278
ProtectedPromise(std::promise<T> &&promise);
278-
void SetValue(T &&r);
279-
void SetValue(const T &r);
279+
bool SetValue(T &&r);
280+
bool SetValue(const T &r);
280281
std::future<T> GetFuture();
281282
void Reset();
282283

@@ -313,12 +314,14 @@ namespace Aws
313314

314315
ClientContinuation NewStream(ClientContinuationHandler &clientContinuationHandler) noexcept;
315316

316-
void Close() noexcept;
317+
std::future<RpcError> Close() noexcept;
318+
319+
bool IsOpen() const noexcept { return aws_event_stream_rpc_client_connection_is_open(this->m_underlyingConnection); }
317320

318321
/**
319322
* @return true if the instance is in a valid state, false otherwise.
320323
*/
321-
operator bool() const noexcept;
324+
operator bool() const noexcept { return IsOpen(); }
322325
/**
323326
* @return the value of the last aws error encountered by operations on this instance.
324327
*/

eventstreamrpc/source/EventStreamClient.cpp

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -146,30 +146,42 @@ namespace Aws
146146

147147
template <typename T> ProtectedPromise<T>::ProtectedPromise() : m_fulfilled(false) {}
148148

149-
template <typename T> void ProtectedPromise<T>::SetValue(T &&rhs)
149+
template <typename T> bool ProtectedPromise<T>::SetValue(T &&rhs)
150150
{
151151
if (m_mutex.try_lock())
152152
{
153-
if (!m_fulfilled)
153+
if (m_fulfilled)
154+
{
155+
return false;
156+
}
157+
else
154158
{
155159
m_promise.set_value(std::move(rhs));
156160
m_fulfilled = true;
161+
return true;
157162
}
158163
m_mutex.unlock();
159164
}
165+
return false;
160166
}
161167

162-
template <typename T> void ProtectedPromise<T>::SetValue(const T &lhs)
168+
template <typename T> bool ProtectedPromise<T>::SetValue(const T &lhs)
163169
{
164170
if (m_mutex.try_lock())
165171
{
166-
if (!m_fulfilled)
172+
if (m_fulfilled)
173+
{
174+
return false;
175+
}
176+
else
167177
{
168178
m_promise.set_value(lhs);
169179
m_fulfilled = true;
180+
return true;
170181
}
171182
m_mutex.unlock();
172183
}
184+
return false;
173185
}
174186

175187
template <typename T> std::future<T> ProtectedPromise<T>::GetFuture() { return m_promise.get_future(); }
@@ -246,14 +258,14 @@ namespace Aws
246258
{
247259
if (m_underlyingConnection)
248260
{
249-
Close();
261+
Close().wait();
250262
m_underlyingConnection = nullptr;
251263
}
252264
}
253265

254266
bool ConnectionLifecycleHandler::OnErrorCallback(int errorCode)
255267
{
256-
(void) errorCode;
268+
(void)errorCode;
257269
/* Returning true implies that the connection will close upon receiving an error. */
258270
return true;
259271
}
@@ -475,9 +487,16 @@ namespace Aws
475487
return onFlushPromise.get_future();
476488
}
477489

478-
void ClientConnection::Close() noexcept
490+
std::future<RpcError> ClientConnection::Close() noexcept
479491
{
480-
aws_event_stream_rpc_client_connection_close(this->m_underlyingConnection, AWS_OP_SUCCESS);
492+
if(IsOpen()) {
493+
aws_event_stream_rpc_client_connection_close(this->m_underlyingConnection, AWS_OP_SUCCESS);
494+
return m_closedPromise.get_future();
495+
} else {
496+
std::promise<RpcError> closedPromise;
497+
closedPromise.set_value({EVENT_STREAM_RPC_CONNECTION_CLOSED, 0});
498+
return closedPromise.get_future();
499+
}
481500
}
482501

483502
EventStreamHeader::EventStreamHeader(
@@ -592,14 +611,6 @@ namespace Aws
592611
messageAmendmentHeaders.splice(messageAmendmentHeaders.end(), amenderHeaderList);
593612
}
594613
messageAmendment.SetPayload(connectAmendment.GetPayload());
595-
if(messageAmendment.GetPayload().has_value())
596-
{
597-
std::cout << "wtf is going on" << std::endl;
598-
std::cout << Crt::String(
599-
(char *)messageAmendment.GetPayload().value().buffer,
600-
messageAmendment.GetPayload().value().len)
601-
<< std::endl;
602-
}
603614
}
604615

605616
/* Send a CONNECT packet to the server. */
@@ -641,12 +652,11 @@ namespace Aws
641652
auto *thisConnection = static_cast<ClientConnection *>(userData);
642653

643654
/* Protect setting future value since the connection might shut down while processing CONNACK. */
644-
thisConnection->m_connectAckedPromise.SetValue(
645-
{EVENT_STREAM_RPC_CONNECTION_CLOSED_BEFORE_CONNACK, errorCode});
646-
/*if (errorCode)
655+
thisConnection->m_connectAckedPromise.SetValue({EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED, errorCode});
656+
if (errorCode)
647657
thisConnection->m_closedPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
648658
else
649-
thisConnection->m_closedPromise.set_value({EVENT_STREAM_RPC_SUCCESS, errorCode});*/
659+
thisConnection->m_closedPromise.set_value({EVENT_STREAM_RPC_SUCCESS, errorCode});
650660

651661
thisConnection->m_lifecycleHandler->OnDisconnectCallback(errorCode);
652662
thisConnection->m_stateMutex.lock();
@@ -1234,12 +1244,6 @@ namespace Aws
12341244
}
12351245
}
12361246

1237-
if (messageFlags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM)
1238-
{
1239-
/* The server would like to terminate the stream even though no data was received. */
1240-
errorCode = EVENT_STREAM_RPC_STREAM_CLOSED_PREMATURELY;
1241-
}
1242-
12431247
if (errorCode)
12441248
{
12451249
if (m_messageCount == 1)
@@ -1291,7 +1295,7 @@ namespace Aws
12911295
{
12921296
/* Because m_initialResponsePromise is a ProtectedPromise, setting its value when it's already
12931297
* potentially set by `OnContinuationMessage` will just do nothing. */
1294-
m_initialResponsePromise.SetValue(TaggedResult({EVENT_STREAM_RPC_STREAM_CLOSED_PREMATURELY, 0}));
1298+
m_initialResponsePromise.SetValue(TaggedResult({EVENT_STREAM_RPC_CONTINUATION_CLOSED, 0}));
12951299

12961300
m_closedPromise.set_value();
12971301

@@ -1306,7 +1310,7 @@ namespace Aws
13061310
if (m_isClosed.load())
13071311
{
13081312
std::promise<RpcError> alreadyClosedPromise;
1309-
alreadyClosedPromise.set_value({EVENT_STREAM_RPC_STREAM_CLOSED_PREMATURELY, 0});
1313+
alreadyClosedPromise.set_value({EVENT_STREAM_RPC_CONTINUATION_CLOSED, 0});
13101314
return alreadyClosedPromise.get_future();
13111315
}
13121316
else

eventstreamrpc/tests/EventStreamClientTest.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ static int s_TestEventStreamConnect(struct aws_allocator *allocator, void *ctx)
8787
ClientConnection connection(allocator);
8888
connectionAmendment.AddHeader(EventStreamHeader(
8989
Aws::Crt::String("client-name"), Aws::Crt::String("accepted.testy_mc_testerson"), allocator));
90-
auto future = connection.Connect(config, &lifecycleHandler, messageAmender, clientBootstrap);
90+
auto future = connection.Connect(config, &lifecycleHandler, clientBootstrap);
9191
ASSERT_TRUE(future.get().baseStatus == EVENT_STREAM_RPC_SUCCESS);
9292
lifecycleHandler.WaitOnCondition([&]() { return lifecycleHandler.isConnected; });
9393
/* Test all protocol messages. */
@@ -106,8 +106,8 @@ static int s_TestEventStreamConnect(struct aws_allocator *allocator, void *ctx)
106106
{
107107
TestLifecycleHandler lifecycleHandler;
108108
ClientConnection connection(allocator);
109-
auto future = connection.Connect(config, &lifecycleHandler, messageAmender, clientBootstrap);
110-
ASSERT_TRUE(future.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_CLOSED_BEFORE_CONNACK);
109+
auto future = connection.Connect(config, &lifecycleHandler, clientBootstrap);
110+
ASSERT_TRUE(future.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED);
111111
lifecycleHandler.WaitOnCondition([&]() { return lifecycleHandler.lastErrorCode == AWS_OP_SUCCESS; });
112112
}
113113

@@ -117,8 +117,8 @@ static int s_TestEventStreamConnect(struct aws_allocator *allocator, void *ctx)
117117
ClientConnection connection(allocator);
118118
connectionAmendment.AddHeader(EventStreamHeader(
119119
Aws::Crt::String("client-name"), Aws::Crt::String("rejected.testy_mc_testerson"), allocator));
120-
auto future = connection.Connect(config, &lifecycleHandler, messageAmender, clientBootstrap);
121-
ASSERT_TRUE(future.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_CLOSED_BEFORE_CONNACK);
120+
auto future = connection.Connect(config, &lifecycleHandler, clientBootstrap);
121+
ASSERT_TRUE(future.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED);
122122
lifecycleHandler.WaitOnCondition([&]() { return lifecycleHandler.lastErrorCode == AWS_OP_SUCCESS; });
123123
}
124124
}

ipc/tests/EchoClientTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ static int s_connectToServer(struct aws_allocator *allocator, void *ctx)
4141
auto connectedStatus = client.Connect(lifecycleHandler);
4242
ASSERT_TRUE(connectedStatus.get().baseStatus == EVENT_STREAM_RPC_SUCCESS);
4343

44-
client.Close();
44+
//client.Close();
4545
}
4646

4747
return AWS_OP_SUCCESS;

ipc/tests/EchoTestRpcClient.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ namespace Awstest
1919
return m_connection.Connect(connectionConfig, &lifecycleHandler, m_clientBootstrap);
2020
}
2121

22-
void EchoTestRpcClient::Close() noexcept { m_connection.Close(); }
22+
std::future<RpcError> EchoTestRpcClient::Close() noexcept { return m_connection.Close(); }
2323

24-
EchoTestRpcClient::~EchoTestRpcClient() noexcept { Close(); }
24+
EchoTestRpcClient::~EchoTestRpcClient() noexcept { Close().wait(); }
2525

2626
GetAllProductsOperation EchoTestRpcClient::NewGetAllProducts() noexcept
2727
{

ipc/tests/include/awstest/EchoTestRpcClient.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ namespace Awstest
2727
std::future<RpcError> Connect(
2828
ConnectionLifecycleHandler &lifecycleHandler,
2929
const ConnectionConfig &connectionConfig = DefaultConnectionConfig()) noexcept;
30-
void Close() noexcept;
30+
std::future<RpcError> Close() noexcept;
3131
GetAllProductsOperation NewGetAllProducts() noexcept;
3232
CauseServiceErrorOperation NewCauseServiceError() noexcept;
3333
CauseStreamServiceToErrorOperation NewCauseStreamServiceToError(

0 commit comments

Comments
 (0)