Skip to content

Commit 921916f

Browse files
committed
Fix segfault
1 parent 47adab9 commit 921916f

File tree

5 files changed

+89
-6
lines changed

5 files changed

+89
-6
lines changed

src/pb_stub.cc

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
665665
ScopedDefer _(
666666
[this, &execute_response] { SendIPCMessage(execute_response); });
667667
py::object execute_return;
668+
py::object coroutine_return;
668669
try {
669670
if (!py::hasattr(model_instance_, "execute")) {
670671
std::string message = "Python model " + model_context_.PythonModelPath() +
@@ -685,7 +686,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
685686
// Do not wait for async decoupled execute to return.
686687
RunCoroutine(execute_return, true /* in_background */);
687688
} else {
688-
py::object coroutine_return =
689+
coroutine_return =
689690
RunCoroutine(execute_return, false /* in_background */);
690691
ProcessReturnedResponses(
691692
py_request_list, coroutine_return, response_batch);
@@ -733,6 +734,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
733734
}
734735
} else {
735736
if (!response_batch) {
737+
std::cerr << "===== response_batch is not set" << std::endl;
736738
response_batch = shm_pool_->Construct<char>(
737739
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
738740
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
@@ -743,6 +745,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
743745
response_batch.value().data_.get() + sizeof(IPCMessageShm));
744746
response_batch_shm_ptr->has_error = false;
745747
response_batch_shm_ptr->is_error_set = false;
748+
std::cerr << "===== response_batch_shm_ptr->batch_size: "
749+
<< response_batch_shm_ptr->batch_size << std::endl;
746750
}
747751

748752
execute_response = IPCMessage::Create(
@@ -779,6 +783,27 @@ Stub::ProcessReturnedResponses(
779783
}
780784
// Only non-decoupled may return responses.
781785
if (IsDecoupled()) {
786+
// For decoupled mode, if before returning from this error, there was
787+
// already a response sent from the response sender, along with the complete
788+
// final flag, then use the `is_response_factory_deleted` flag to notify the
789+
// backend to NOT to delete the response factory again during error
790+
// handling.
791+
for (py::handle py_request : py_requests) {
792+
InferRequest* request = py_request.cast<InferRequest*>();
793+
if (request->GetResponseSender()->IsClosed()) {
794+
// Notify the backend to NOT to delete the response factory again during
795+
// error handling.
796+
if (!response_batch) {
797+
response_batch = std::move(shm_pool_->Construct<char>(
798+
sizeof(ResponseBatch) + sizeof(IPCMessageShm)));
799+
}
800+
ResponseBatch* response_batch_shm_ptr =
801+
reinterpret_cast<ResponseBatch*>(
802+
response_batch.value().data_.get() + sizeof(IPCMessageShm));
803+
response_batch_shm_ptr->is_response_factory_deleted = true;
804+
}
805+
}
806+
782807
throw PythonBackendException(
783808
"Python model '" + name_ +
784809
"' is using the decoupled mode and the execute function must return "
@@ -821,8 +846,31 @@ Stub::ProcessReturnedResponses(
821846
}
822847

823848
InferResponse* response = py_responses[i].cast<InferResponse*>();
824-
request->GetResponseSender()->UpdateStateAndCounters(
825-
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
849+
850+
try {
851+
request->GetResponseSender()->UpdateStateAndCounters(
852+
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
853+
}
854+
catch (const PythonBackendException& pb_exception) {
855+
// Special case for default(non-decoupled) mode, where the response
856+
// factory should already be cleaned up with the previous response sent
857+
// from response sender, and yet the model tries to return another
858+
// response from `execute()` function. Notify the backend to NOT to
859+
// delete the response factory again during error handling.
860+
std::string error_string = pb_exception.what();
861+
if (error_string.find(
862+
"Non-decoupled model cannot send more than one response") !=
863+
std::string::npos) {
864+
response_batch = std::move(shm_pool_->Construct<char>(
865+
sizeof(ResponseBatch) + sizeof(IPCMessageShm)));
866+
ResponseBatch* response_batch_shm_ptr =
867+
reinterpret_cast<ResponseBatch*>(
868+
response_batch.value().data_.get() + sizeof(IPCMessageShm));
869+
response_batch_shm_ptr->is_response_factory_deleted = true;
870+
LOG_ERROR << "=== caught error: " << pb_exception.what();
871+
}
872+
throw pb_exception;
873+
}
826874
}
827875
}
828876
// Return all the created responses using response_batch. The reason

src/pb_utils.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ struct ResponseBatch : SendMessageBase {
167167
bool is_error_set;
168168

169169
uint32_t response_size;
170+
171+
// Indicates whether the response factory has been deleted or not.
172+
bool is_response_factory_deleted = false;
170173
};
171174

172175
enum LogLevel { kInfo = 0, kWarning, kError, kVerbose };

src/python_be.cc

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,8 @@ ModelInstanceState::ProcessCleanupRequest(
826826
infer_payload_.erase(id);
827827
} else if (message->Command() == PYTHONSTUB_DecoupledResponseFactoryCleanup) {
828828
// Delete response factory
829+
std::cerr << "=== ResponseFactoryDeleter -> ProcessCleanupRequest ==="
830+
<< std::endl;
829831
std::unique_ptr<
830832
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
831833
response_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(id));
@@ -1094,6 +1096,8 @@ ModelInstanceState::ResponseSendDecoupled(
10941096
TRITONBACKEND_ResponseFactory* response_factory =
10951097
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
10961098
send_message_payload->response_factory_address);
1099+
std::cerr << "=== ResponseFactoryDeleter -> ResponseSendDecoupled ==="
1100+
<< std::endl;
10971101
std::unique_ptr<
10981102
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
10991103
lresponse_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
@@ -1284,7 +1288,6 @@ ModelInstanceState::ProcessRequests(
12841288
}
12851289
char* ipc_message_shm =
12861290
reinterpret_cast<char*>(response->GetAllocatedSharedMemory().data_.get());
1287-
;
12881291
ResponseBatch* response_batch_shm_ptr =
12891292
reinterpret_cast<ResponseBatch*>(ipc_message_shm + sizeof(IPCMessageShm));
12901293

@@ -1294,16 +1297,27 @@ ModelInstanceState::ProcessRequests(
12941297
reporter.SetBatchStatistics(total_batch_size);
12951298

12961299
if (response_batch_shm_ptr->has_error) {
1297-
if (response_batch_shm_ptr->is_error_set) {
1300+
// The "is_response_factory_deleted" flag indicates whether the response
1301+
// factory has been deleted. The flag is used in a corner case
1302+
// where after the response sender sends a response and complete final flag,
1303+
// and closes the response factory, the model returns a response from
1304+
// `execute()`. For both default and decoupled mode, upon handling that
1305+
// error, no need to delete the response factory.
1306+
if (!response_batch_shm_ptr->is_response_factory_deleted) {
12981307
for (uint32_t r = 0; r < request_count; r++) {
12991308
TRITONBACKEND_ResponseFactory* response_factory =
13001309
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
13011310
pb_infer_requests[r]->GetResponseFactoryAddress());
1311+
std::cerr << "=== ResponseFactoryDeleter -> "
1312+
"response_batch_shm_ptr->has_error ==="
1313+
<< std::endl;
13021314
std::unique_ptr<
13031315
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
13041316
lresponse_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
13051317
response_factory));
13061318
}
1319+
}
1320+
if (response_batch_shm_ptr->is_error_set) {
13071321
auto error = PbString::LoadFromSharedMemory(
13081322
Stub()->ShmPool(), response_batch_shm_ptr->error);
13091323
return TRITONSERVER_ErrorNew(
@@ -1343,6 +1357,7 @@ ModelInstanceState::ProcessRequests(
13431357
gpu_output_buffers(request_count);
13441358
GPUBuffersHelper gpu_buffer_helper;
13451359

1360+
std::cerr << "=== PYBE request_count: " << request_count << std::endl;
13461361
for (uint32_t r = 0; r < request_count; ++r) {
13471362
NVTX_RANGE(nvtx_, "LoadingResponse " + Name());
13481363
TRITONBACKEND_Response* response = (*responses)[r];
@@ -1374,6 +1389,8 @@ ModelInstanceState::ProcessRequests(
13741389
TRITONBACKEND_ResponseFactory* response_factory =
13751390
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
13761391
pb_infer_requests[r]->GetResponseFactoryAddress());
1392+
std::cerr << "=== ResponseFactoryDeleter -> regular workflow ==="
1393+
<< std::endl;
13771394
std::unique_ptr<
13781395
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
13791396
lresponse_factory(
@@ -1422,7 +1439,8 @@ ModelInstanceState::ProcessRequests(
14221439
GUARDED_RESPOND_IF_ERROR(
14231440
responses, r,
14241441
TRITONBACKEND_RequestOutputCount(request, &requested_output_count));
1425-
1442+
std::cerr << "=== PYBE requested_output_count: " << requested_output_count
1443+
<< std::endl;
14261444
std::set<std::string> requested_output_names;
14271445
for (size_t j = 0; j < requested_output_count; ++j) {
14281446
const char* output_name;

src/response_sender.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ ResponseSender::UpdateStateAndCounters(
106106
}
107107

108108
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
109+
std::cerr << "=== ResponseSender -> UpdateStateAndCounters closing RF ==="
110+
<< std::endl;
109111
response_factory_deleted_.exchange(true);
110112
closed_ = true;
111113
}
@@ -175,6 +177,7 @@ ResponseSender::Send(
175177
bi::scoped_lock<bi::interprocess_mutex> guard{send_message_payload->mu};
176178
// The server will destruct the response factory if the final flag is set.
177179
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
180+
std::cerr << "====== scoped_defer -> closing RF =====" << std::endl;
178181
response_factory_deleted_.exchange(true);
179182
}
180183
stub->SendIPCUtilsMessage(ipc_message);
@@ -259,16 +262,26 @@ ResponseSender::IsCancelled()
259262
return pb_cancel_->IsCancelled();
260263
}
261264

265+
bool
266+
ResponseSender::IsClosed()
267+
{
268+
std::lock_guard<std::mutex> lk(mu_);
269+
return closed_;
270+
}
271+
262272
void
263273
ResponseSender::Close()
264274
{
265275
std::lock_guard<std::mutex> lk(mu_);
266276
closed_ = true;
277+
response_factory_deleted_.exchange(true);
267278
}
268279

269280
void
270281
ResponseSender::DeleteResponseFactory()
271282
{
283+
std::cerr << "=== ResponseSender -> DeleteResponseFactory, "
284+
<< response_factory_deleted_ << " ===" << std::endl;
272285
bool already_deleted = response_factory_deleted_.exchange(true);
273286
if (!already_deleted) {
274287
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();

src/response_sender.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class ResponseSender {
5151

5252
// Can be useful at stopping the model from sending any more responses.
5353
void Close();
54+
bool IsClosed();
5455

5556
private:
5657
void DeleteResponseFactory();

0 commit comments

Comments
 (0)