Skip to content

Commit dbf5c43

Browse files
committed
Fix error handling
1 parent 921916f commit dbf5c43

File tree

5 files changed

+192
-55
lines changed

5 files changed

+192
-55
lines changed

src/pb_stub.cc

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,24 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
719719
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
720720
response_batch.value().data_.get() + sizeof(IPCMessageShm));
721721

722+
// Handle two special cases:
723+
// 1. For default(non-decoupled) mode, where the response
724+
// factory should already be cleaned up with the previous response sent
725+
// from response sender, and yet the model tries to return another
726+
// response from `execute()` function. Notify the backend to NOT to
727+
// delete the response factory again during error handling.
728+
// 2.The response sender is already closed, need to notify the backend to
729+
// NOT to delete the response factory again during error handling.
730+
// std::string error_string = pb_exception.what();
731+
if ((err_message.find(
732+
"Non-decoupled model cannot send more than one response") !=
733+
std::string::npos) ||
734+
(err_message.find("Response sender has been closed") !=
735+
std::string::npos)) {
736+
response_batch_shm_ptr->is_response_factory_deleted = true;
737+
LOG_ERROR << "=== caught error: " << err_message;
738+
}
739+
722740
response_batch_shm_ptr->has_error = true;
723741
error_string_shm = PbString::Create(shm_pool_, err_message);
724742
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
@@ -734,6 +752,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
734752
}
735753
} else {
736754
if (!response_batch) {
755+
// No response is returned from `execute()`.
737756
std::cerr << "===== response_batch is not set" << std::endl;
738757
response_batch = shm_pool_->Construct<char>(
739758
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
@@ -846,31 +865,8 @@ Stub::ProcessReturnedResponses(
846865
}
847866

848867
InferResponse* response = py_responses[i].cast<InferResponse*>();
849-
850-
try {
851-
request->GetResponseSender()->UpdateStateAndCounters(
852-
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
853-
}
854-
catch (const PythonBackendException& pb_exception) {
855-
// Special case for default(non-decoupled) mode, where the response
856-
// factory should already be cleaned up with the previous response sent
857-
// from response sender, and yet the model tries to return another
858-
// response from `execute()` function. Notify the backend to NOT to
859-
// delete the response factory again during error handling.
860-
std::string error_string = pb_exception.what();
861-
if (error_string.find(
862-
"Non-decoupled model cannot send more than one response") !=
863-
std::string::npos) {
864-
response_batch = std::move(shm_pool_->Construct<char>(
865-
sizeof(ResponseBatch) + sizeof(IPCMessageShm)));
866-
ResponseBatch* response_batch_shm_ptr =
867-
reinterpret_cast<ResponseBatch*>(
868-
response_batch.value().data_.get() + sizeof(IPCMessageShm));
869-
response_batch_shm_ptr->is_response_factory_deleted = true;
870-
LOG_ERROR << "=== caught error: " << pb_exception.what();
871-
}
872-
throw pb_exception;
873-
}
868+
request->GetResponseSender()->UpdateStateAndCounters(
869+
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
874870
}
875871
}
876872
// Return all the created responses using response_batch. The reason
@@ -887,16 +883,18 @@ Stub::ProcessReturnedResponses(
887883
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
888884
response_batch.value().data_.get() + sizeof(ResponseBatch) +
889885
sizeof(IPCMessageShm));
890-
886+
std::cerr << "===== response_size: " << responses_size << std::endl;
891887
for (size_t i = 0; i < responses_size; i++) {
892888
// Check the return type of execute function.
893889
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
894890
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
895891
if (!py::isinstance<py::none>(py_responses[i])) {
892+
std::cerr << "===== response is NOT None" << std::endl;
896893
infer_response->PruneOutputTensors(infer_request->RequestedOutputNames());
897894
ProcessResponse(infer_response);
898895
responses_shm_handle[i] = infer_response->ShmHandle();
899896
} else {
897+
std::cerr << "===== response is None" << std::endl;
900898
responses_shm_handle[i] = 0;
901899
}
902900
}

src/python_be.cc

Lines changed: 109 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,23 @@ ModelInstanceState::SetErrorForResponseSendMessage(
153153
}
154154
}
155155

156+
bool
157+
ModelInstanceState::IsStubProcessAlive()
158+
{
159+
boost::posix_time::ptime timeout =
160+
boost::get_system_time() + boost::posix_time::seconds(1);
161+
bi::scoped_lock<bi::interprocess_mutex> lock(*Stub()->HealthMutex(), timeout);
162+
163+
// Check if lock has been acquired.
164+
if (lock) {
165+
return Stub()->IpcControl()->stub_health;
166+
} else {
167+
// If It failed to obtain the lock, it means that the stub has been
168+
// stuck or exited while holding the health mutex lock.
169+
return false;
170+
}
171+
}
172+
156173
TRITONSERVER_Error*
157174
ModelInstanceState::SaveRequestsToSharedMemory(
158175
TRITONBACKEND_Request** requests, const uint32_t request_count,
@@ -1011,11 +1028,43 @@ ModelInstanceState::ProcessModelControlRequest(
10111028
});
10121029
}
10131030

1014-
void
1031+
TRITONSERVER_Error*
10151032
ModelInstanceState::SendMessageToStub(
10161033
bi::managed_external_buffer::handle_t message)
10171034
{
1018-
Stub()->StubMessageQueue()->Push(message);
1035+
// Stub()->StubMessageQueue()->Push(message);
1036+
bool success = false;
1037+
while (!success) {
1038+
uint64_t timeout_miliseconds = 1000;
1039+
{
1040+
boost::posix_time::ptime timeout =
1041+
boost::get_system_time() +
1042+
boost::posix_time::milliseconds(timeout_miliseconds);
1043+
1044+
bi::scoped_lock<bi::interprocess_mutex> lock(
1045+
*(Stub()->HealthMutex()), timeout);
1046+
1047+
// Check if lock has been acquired.
1048+
if (lock) {
1049+
Stub()->IpcControl()->stub_health = false;
1050+
} else {
1051+
// If it failed to obtain the lock, it means that the stub has been
1052+
// stuck or exited while holding the health mutex lock.
1053+
return TRITONSERVER_ErrorNew(
1054+
TRITONSERVER_ERROR_INTERNAL, "Failed to obtain the health mutex.");
1055+
}
1056+
}
1057+
1058+
Stub()->StubMessageQueue()->Push(
1059+
message, timeout_miliseconds /* duration ms */, success);
1060+
1061+
if (!success && !IsStubProcessAlive()) {
1062+
return TRITONSERVER_ErrorNew(
1063+
TRITONSERVER_ERROR_INTERNAL, "Stub process is not healthy.");
1064+
}
1065+
}
1066+
1067+
return nullptr; // success
10191068
}
10201069

10211070
void
@@ -1025,10 +1074,29 @@ ModelInstanceState::SendMessageAndReceiveResponse(
10251074
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
10261075
TRITONBACKEND_Request** requests, const uint32_t request_count)
10271076
{
1028-
SendMessageToStub(message);
1077+
// SendMessageToStub(message);
1078+
1079+
// bi::managed_external_buffer::handle_t response_message;
1080+
// Stub()->ReceiveMessageFromStub(response_message);
1081+
1082+
// response = response_message;
1083+
1084+
auto error = SendMessageToStub(message);
1085+
if (error != nullptr) {
1086+
RespondErrorToAllRequests(
1087+
TRITONSERVER_ErrorMessage(error), responses, requests, request_count);
1088+
1089+
return;
1090+
}
10291091

10301092
bi::managed_external_buffer::handle_t response_message;
1031-
Stub()->ReceiveMessageFromStub(response_message);
1093+
error = Stub()->ReceiveMessageFromStub(response_message);
1094+
if (error != nullptr) {
1095+
RespondErrorToAllRequests(
1096+
TRITONSERVER_ErrorMessage(error), responses, requests, request_count);
1097+
1098+
return;
1099+
}
10321100

10331101
response = response_message;
10341102
}
@@ -1061,6 +1129,7 @@ ModelInstanceState::RespondErrorToAllRequests(
10611129
}
10621130
}
10631131

1132+
10641133
void
10651134
ModelInstanceState::StartMonitor()
10661135
{
@@ -1282,7 +1351,7 @@ ModelInstanceState::ProcessRequests(
12821351
{
12831352
Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle());
12841353
bi::managed_external_buffer::handle_t response_message;
1285-
Stub()->ReceiveMessageFromStub(response_message);
1354+
RETURN_IF_ERROR(Stub()->ReceiveMessageFromStub(response_message));
12861355
response =
12871356
IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message);
12881357
}
@@ -1329,26 +1398,34 @@ ModelInstanceState::ProcessRequests(
13291398
}
13301399

13311400
if (response_batch_shm_ptr->batch_size > 0) {
1401+
bi::managed_external_buffer::handle_t* response_shm_handle =
1402+
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
1403+
ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm));
1404+
13321405
std::shared_ptr<std::vector<TRITONBACKEND_Response*>> responses(
13331406
new std::vector<TRITONBACKEND_Response*>());
13341407
responses->reserve(request_count);
13351408
for (size_t i = 0; i < request_count; i++) {
1336-
TRITONBACKEND_Response* response;
1337-
auto err = TRITONBACKEND_ResponseNew(&response, requests[i]);
1338-
if (err == nullptr) {
1339-
responses->emplace_back(response);
1340-
} else {
1409+
// It is possible to have multiple responses batched together in a single
1410+
// response batch shm, where some of the responses are None due to the
1411+
// usage of response sender, so only create a TRITONBACKEND_Response
1412+
// object for the valid responses, and skip the None responses later.
1413+
if (response_shm_handle[i] == 0) {
1414+
std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl;
13411415
responses->emplace_back(nullptr);
1342-
LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response");
1343-
TRITONSERVER_ErrorDelete(err);
1416+
} else {
1417+
TRITONBACKEND_Response* response;
1418+
auto err = TRITONBACKEND_ResponseNew(&response, requests[i]);
1419+
if (err == nullptr) {
1420+
responses->emplace_back(response);
1421+
} else {
1422+
responses->emplace_back(nullptr);
1423+
LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response");
1424+
TRITONSERVER_ErrorDelete(err);
1425+
}
13441426
}
13451427
}
1346-
bi::managed_external_buffer::handle_t* response_shm_handle =
1347-
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
1348-
ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm));
13491428

1350-
// If the output provided by the model is in GPU, we will pass the list of
1351-
// buffers provided by Triton to the stub process.
13521429
std::vector<bool> requires_deferred_callback;
13531430

13541431
bool has_gpu_output = false;
@@ -1360,6 +1437,11 @@ ModelInstanceState::ProcessRequests(
13601437
std::cerr << "=== PYBE request_count: " << request_count << std::endl;
13611438
for (uint32_t r = 0; r < request_count; ++r) {
13621439
NVTX_RANGE(nvtx_, "LoadingResponse " + Name());
1440+
if (response_shm_handle[r] == 0) {
1441+
std::cerr << "=== PYBE skip the response_shm_handle is 0 ==="
1442+
<< std::endl;
1443+
continue;
1444+
}
13631445
TRITONBACKEND_Response* response = (*responses)[r];
13641446
TRITONBACKEND_Request* request = requests[r];
13651447
uint32_t requested_output_count = 0;
@@ -1378,13 +1460,14 @@ ModelInstanceState::ProcessRequests(
13781460
continue;
13791461
}
13801462

1381-
if (response_shm_handle[r] == 0) {
1382-
LOG_IF_ERROR(
1383-
TRITONBACKEND_ResponseDelete((*responses)[r]),
1384-
"failed to delete response");
1385-
(*responses)[r] = nullptr;
1386-
continue;
1387-
}
1463+
// if (response_shm_handle[r] == 0) {
1464+
// std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl;
1465+
// LOG_IF_ERROR(
1466+
// TRITONBACKEND_ResponseDelete((*responses)[r]),
1467+
// "failed to delete response");
1468+
// (*responses)[r] = nullptr;
1469+
// continue;
1470+
// }
13881471
{
13891472
TRITONBACKEND_ResponseFactory* response_factory =
13901473
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
@@ -1448,6 +1531,8 @@ ModelInstanceState::ProcessRequests(
14481531
responses, r,
14491532
TRITONBACKEND_RequestOutputName(request, j, &output_name));
14501533
requested_output_names.insert(output_name);
1534+
std::cerr << "=== PYBE requested_output_name: " << output_name
1535+
<< std::endl;
14511536
}
14521537

14531538
bool require_deferred_callback = false;

src/python_be.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,12 @@ class ModelInstanceState : public BackendModelInstance {
369369
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
370370
TRITONBACKEND_Request** requests, const uint32_t request_count);
371371

372-
void SendMessageToStub(bi::managed_external_buffer::handle_t message);
372+
// void SendMessageToStub(bi::managed_external_buffer::handle_t message);
373+
TRITONSERVER_Error* SendMessageToStub(
374+
bi::managed_external_buffer::handle_t message);
375+
376+
// Checks whether the stub process is live
377+
bool IsStubProcessAlive();
373378

374379
// Model instance stub
375380
std::unique_ptr<StubLauncher>& Stub() { return model_instance_stub_; }

src/stub_launcher.cc

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ StubLauncher::ModelInstanceStubProcess()
593593
stub_message_queue_->Push(initialize_message->ShmHandle());
594594

595595
bi::managed_external_buffer::handle_t message;
596-
ReceiveMessageFromStub(message);
596+
RETURN_IF_ERROR(ReceiveMessageFromStub(message));
597597

598598
std::unique_ptr<IPCMessage> initialize_response_message =
599599
IPCMessage::LoadFromSharedMemory(shm_pool_, message);
@@ -724,11 +724,59 @@ StubLauncher::KillStubProcess()
724724
#endif
725725
}
726726

727-
void
727+
TRITONSERVER_Error*
728728
StubLauncher::ReceiveMessageFromStub(
729729
bi::managed_external_buffer::handle_t& message)
730730
{
731-
message = parent_message_queue_->Pop();
731+
// message = parent_message_queue_->Pop();
732+
bool success = false;
733+
while (!success) {
734+
uint64_t timeout_miliseconds = 1000;
735+
{
736+
boost::posix_time::ptime timeout =
737+
boost::get_system_time() +
738+
boost::posix_time::milliseconds(timeout_miliseconds);
739+
740+
bi::scoped_lock<bi::interprocess_mutex> lock(*health_mutex_, timeout);
741+
742+
// Check if lock has been acquired.
743+
if (lock) {
744+
ipc_control_->stub_health = false;
745+
} else {
746+
// If it failed to obtain the lock, it means that the stub has been
747+
// stuck or exited while holding the health mutex lock.
748+
return TRITONSERVER_ErrorNew(
749+
TRITONSERVER_ERROR_INTERNAL, "Failed to obtain the health mutex.");
750+
}
751+
}
752+
753+
message = parent_message_queue_->Pop(
754+
timeout_miliseconds /* duration ms */, success);
755+
756+
bool is_stub_alive = false;
757+
{
758+
boost::posix_time::ptime timeout =
759+
boost::get_system_time() + boost::posix_time::seconds(1);
760+
bi::scoped_lock<bi::interprocess_mutex> lock(*health_mutex_, timeout);
761+
if (lock) {
762+
is_stub_alive = ipc_control_->stub_health;
763+
} else {
764+
// If It failed to obtain the lock, it means that the stub has been
765+
// stuck or exited while holding the health mutex lock.
766+
is_stub_alive = false;
767+
}
768+
}
769+
770+
if (!success && !is_stub_alive) {
771+
return TRITONSERVER_ErrorNew(
772+
TRITONSERVER_ERROR_INTERNAL,
773+
(std::string("Stub process '") + model_instance_name_ +
774+
"' is not healthy.")
775+
.c_str());
776+
}
777+
}
778+
779+
return nullptr; // success
732780
}
733781

734782
void

src/stub_launcher.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ class StubLauncher {
146146
void KillStubProcess();
147147

148148
// Get a message from the stub process
149-
void ReceiveMessageFromStub(bi::managed_external_buffer::handle_t& message);
149+
TRITONSERVER_Error* ReceiveMessageFromStub(
150+
bi::managed_external_buffer::handle_t& message);
150151

151152
// Wait for stub process
152153
void WaitForStubProcess();

0 commit comments

Comments
 (0)