Skip to content

Commit 76b8ac3

Browse files
committed
Update the interface for OnStreamError to support the base error type and decouple RpcError
1 parent bc05eca commit 76b8ac3

File tree

8 files changed

+700
-209
lines changed

8 files changed

+700
-209
lines changed

eventstream_rpc/source/EventStreamClient.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -839,9 +839,8 @@ namespace Aws
839839
{
840840
if (m_callbackData)
841841
{
842-
m_callbackData->callbackMutex.lock();
842+
const std::lock_guard<std::mutex> lock(m_callbackData->callbackMutex);
843843
m_callbackData->continuationDestroyed = true;
844-
m_callbackData->callbackMutex.unlock();
845844
}
846845
}
847846

@@ -1067,8 +1066,8 @@ namespace Aws
10671066
const OperationModelContext &operationModelContext,
10681067
Crt::Allocator *allocator) noexcept
10691068
: m_operationModelContext(operationModelContext), m_messageCount(0), m_allocator(allocator),
1070-
m_streamHandler(streamHandler), m_clientContinuation(connection.NewStream(*this)),
1071-
m_expectedCloses(0), m_streamClosedCalled(false)
1069+
m_streamHandler(streamHandler), m_clientContinuation(connection.NewStream(*this)), m_expectedCloses(0),
1070+
m_streamClosedCalled(false)
10721071
{
10731072
}
10741073

@@ -1084,7 +1083,7 @@ namespace Aws
10841083
{
10851084
Close().wait();
10861085
std::unique_lock<std::mutex> lock(m_continuationMutex);
1087-
m_closeReady.wait(lock, [this]{return m_expectedCloses.load() == 0;});
1086+
m_closeReady.wait(lock, [this] { return m_expectedCloses.load() == 0; });
10881087
}
10891088

10901089
TaggedResult::TaggedResult(Crt::ScopedResource<AbstractShapeBase> operationResponse) noexcept
@@ -1431,7 +1430,6 @@ namespace Aws
14311430
/* Promises must be reset in case the client would like to send a subsequent request with the same
14321431
* `ClientOperation`. */
14331432
m_initialResponsePromise = {};
1434-
//m_closedPromise = {};
14351433
{
14361434
const std::lock_guard<std::mutex> lock(m_continuationMutex);
14371435
m_resultReceived = false;

eventstream_rpc/tests/EchoTestRpcModel.cpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,13 +1065,19 @@ namespace Awstest
10651065
Aws::Crt::ScopedResource<OperationError> operationError,
10661066
RpcError rpcError)
10671067
{
1068-
if (operationError == nullptr)
1069-
return OnStreamError(rpcError);
1070-
if (operationError->GetModelName() == Aws::Crt::String("awstest#ServiceError"))
1068+
bool streamShouldTerminate = false;
1069+
if (operationError == nullptr && rpcError.baseStatus != EVENT_STREAM_RPC_SUCCESS)
10711070
{
1072-
return OnStreamError(static_cast<ServiceError *>(operationError.get()), rpcError);
1071+
streamShouldTerminate = OnStreamError(rpcError);
10731072
}
1074-
return true;
1073+
if (operationError != nullptr && operationError->GetModelName() == Aws::Crt::String("awstest#ServiceError") &&
1074+
!streamShouldTerminate)
1075+
{
1076+
streamShouldTerminate = OnStreamError(static_cast<ServiceError *>(operationError.get()));
1077+
}
1078+
if (operationError != nullptr && !streamShouldTerminate)
1079+
streamShouldTerminate = OnStreamError(operationError.get());
1080+
return streamShouldTerminate;
10751081
}
10761082

10771083
CauseStreamServiceToErrorOperationContext::CauseStreamServiceToErrorOperationContext(
@@ -1151,9 +1157,14 @@ namespace Awstest
11511157
Aws::Crt::ScopedResource<OperationError> operationError,
11521158
RpcError rpcError)
11531159
{
1154-
if (operationError == nullptr)
1155-
return OnStreamError(rpcError);
1156-
return true;
1160+
bool streamShouldTerminate = false;
1161+
if (operationError == nullptr && rpcError.baseStatus != EVENT_STREAM_RPC_SUCCESS)
1162+
{
1163+
streamShouldTerminate = OnStreamError(rpcError);
1164+
}
1165+
if (operationError != nullptr && !streamShouldTerminate)
1166+
streamShouldTerminate = OnStreamError(operationError.get());
1167+
return streamShouldTerminate;
11571168
}
11581169

11591170
EchoStreamMessagesOperationContext::EchoStreamMessagesOperationContext(

eventstream_rpc/tests/EventStreamClientTest.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -504,14 +504,11 @@ static int s_TestStressClient(struct aws_allocator *allocator, void *ctx)
504504
auto echoMessage = client.NewEchoMessage();
505505
messageData.SetStringMessage(expectedMessage);
506506
echoMessageRequest.SetMessage(messageData);
507-
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
508-
std::future_status status = requestFuture.wait_for(std::chrono::seconds(1));
509-
if (status != std::future_status::ready)
510-
{
511-
return AWS_OP_SUCCESS;
512-
}
507+
auto requestStatus = echoMessage.Activate(echoMessageRequest, s_onMessageFlush).get();
513508
auto resultFuture = echoMessage.GetResult();
514-
status = resultFuture.wait_for(std::chrono::seconds(1));
509+
/* The response may never arrive depending on how many ongoing requests are made
510+
* so in case of timeout, assume success. */
511+
std::future_status status = resultFuture.wait_for(std::chrono::seconds(5));
515512
if (status != std::future_status::ready)
516513
{
517514
return AWS_OP_SUCCESS;
@@ -525,7 +522,7 @@ static int s_TestStressClient(struct aws_allocator *allocator, void *ctx)
525522
return AWS_OP_SUCCESS;
526523
};
527524

528-
for (int i = 0; i < 10000; i++)
525+
for (int i = 0; i < 1000; i++)
529526
threadPool.AddTask(invokeOperation);
530527

531528
threadPool.BlockUntilTasksFinish();

eventstream_rpc/tests/include/awstest/EchoTestRpcClient.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ namespace Awstest
2424
EchoTestRpcClient(
2525
Aws::Crt::Io::ClientBootstrap &clientBootstrap,
2626
Aws::Crt::Allocator *allocator = Aws::Crt::g_allocator) noexcept;
27+
/**
28+
* Connect the client to the server
29+
* @param lifecycleHandler An interface that is called upon when lifecycle events relating to the connection
30+
* occur.
31+
* @param connectionConfig The configuration parameters used for establishing the connection.
32+
* @return An `RpcError` that can be used to check whether the connection was established.
33+
*/
2734
std::future<RpcError> Connect(
2835
ConnectionLifecycleHandler &lifecycleHandler,
2936
const ConnectionConfig &connectionConfig = DefaultConnectionConfig()) noexcept;

0 commit comments

Comments
 (0)