Skip to content

Commit 01ba273

Browse files
authored
Add response sender to non-decoupled models and unify data pipelines (#360)
* Add response sender to non-decoupled model and unify data pipelines * Rename variable and class name
1 parent 9d2c513 commit 01ba273

File tree

5 files changed

+103
-756
lines changed

5 files changed

+103
-756
lines changed

src/infer_request.cc

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -402,13 +402,6 @@ InferRequest::IsCancelled()
402402
std::shared_ptr<ResponseSender>
403403
InferRequest::GetResponseSender()
404404
{
405-
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
406-
if (!stub->IsDecoupled()) {
407-
throw PythonBackendException(
408-
"'get_response_sender' function must be called only when the model is "
409-
"using the decoupled transaction policy.");
410-
}
411-
412405
return response_sender_;
413406
}
414407

src/pb_stub.cc

Lines changed: 70 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,7 @@ Stub::RunCommand()
402402
shm_pool_->Load<char>(ipc_message->Args());
403403
RequestBatch* request_batch_shm_ptr =
404404
reinterpret_cast<RequestBatch*>(request_batch.data_.get());
405-
if (!ipc_control_->decoupled) {
406-
ProcessRequests(request_batch_shm_ptr);
407-
} else {
408-
ProcessRequestsDecoupled(request_batch_shm_ptr);
409-
}
405+
ProcessRequests(request_batch_shm_ptr);
410406

411407
} break;
412408
case PYTHONSTUB_CommandType::PYTHONSTUB_FinalizeRequest:
@@ -597,18 +593,6 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
597593
initialized_ = true;
598594
}
599595

600-
void
601-
Stub::ProcessResponse(InferResponse* response)
602-
{
603-
response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */);
604-
605-
for (auto& output_tensor : response->OutputTensors()) {
606-
if (!output_tensor->IsCPU()) {
607-
gpu_tensors_.push_back(output_tensor);
608-
}
609-
}
610-
}
611-
612596
void
613597
Stub::LoadGPUBuffers(std::unique_ptr<IPCMessage>& ipc_message)
614598
{
@@ -682,7 +666,7 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr)
682666
}
683667

684668
void
685-
Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)
669+
Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
686670
{
687671
py::list py_request_list =
688672
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
@@ -718,18 +702,21 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)
718702

719703
py::object execute_return =
720704
model_instance_.attr("execute")(py_request_list);
705+
721706
bool is_coroutine = py::module::import("asyncio")
722707
.attr("iscoroutine")(execute_return)
723708
.cast<bool>();
724709
if (is_coroutine) {
725-
RunCoroutine(execute_return);
726-
} else {
727-
if (!py::isinstance<py::none>(execute_return)) {
728-
throw PythonBackendException(
729-
"Python model '" + name_ +
730-
"' is using the decoupled mode and the execute function must "
731-
"return None.");
710+
if (IsDecoupled()) {
711+
// Do not wait for async decoupled execute to return.
712+
RunCoroutine(execute_return, true /* in_background */);
713+
} else {
714+
py::object coroutine_return =
715+
RunCoroutine(execute_return, false /* in_background */);
716+
ProcessReturnedResponses(py_request_list, coroutine_return);
732717
}
718+
} else {
719+
ProcessReturnedResponses(py_request_list, execute_return);
733720
}
734721
}
735722
}
@@ -757,151 +744,60 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)
757744
}
758745

759746
void
760-
Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
747+
Stub::ProcessReturnedResponses(
748+
py::list py_requests, py::object py_responses_obj)
761749
{
762-
std::unique_ptr<IPCMessage> execute_response =
763-
IPCMessage::Create(shm_pool_, false /* Inline response */);
764-
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
765-
766-
AllocatedSharedMemory<char> response_batch = shm_pool_->Construct<char>(
767-
request_batch_shm_ptr->batch_size *
768-
sizeof(bi::managed_external_buffer::handle_t) +
769-
sizeof(ResponseBatch));
770-
ResponseBatch* response_batch_shm_ptr =
771-
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
772-
773-
std::unique_ptr<PbString> error_string_shm;
774-
py::list inference_responses;
775-
776-
bi::managed_external_buffer::handle_t* responses_shm_handle =
777-
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
778-
response_batch.data_.get() + sizeof(ResponseBatch));
779-
780-
py::list responses;
781-
782-
// Notifying the stub should be after responses.
783-
ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); });
784-
ScopedDefer _(
785-
[this, &execute_response] { SendIPCMessage(execute_response); });
786-
787-
execute_response->Args() = response_batch.handle_;
788-
789-
bool has_exception = false;
790-
std::string error_string;
791-
try {
792-
response_batch_shm_ptr->has_error = false;
793-
response_batch_shm_ptr->is_error_set = false;
794-
795-
uint32_t batch_size = request_batch_shm_ptr->batch_size;
796-
797-
if (batch_size == 0) {
798-
return;
799-
}
800-
801-
py::list py_request_list =
802-
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
803-
804-
if (!py::hasattr(model_instance_, "execute")) {
805-
std::string message = "Python model " + model_context_.PythonModelPath() +
806-
" does not implement `execute` method.";
807-
throw PythonBackendException(message);
808-
}
809-
810-
py::object request_list = py_request_list;
811-
py::module asyncio = py::module::import("asyncio");
812-
813-
// Execute Response
814-
py::object execute_return;
815-
py::object responses_obj;
816-
bool is_coroutine;
817-
818-
{
819-
NVTX_RANGE(nvtx_, "PyExecute " + name_);
820-
execute_return = model_instance_.attr("execute")(request_list);
821-
is_coroutine = asyncio.attr("iscoroutine")(execute_return).cast<bool>();
822-
}
823-
824-
if (is_coroutine) {
825-
responses_obj = asyncio.attr("run")(execute_return);
826-
} else {
827-
responses_obj = execute_return;
828-
}
829-
830-
// Check the return type of execute function.
831-
if (!py::isinstance<py::list>(responses_obj)) {
832-
std::string str = py::str(execute_return.get_type());
833-
throw PythonBackendException(
834-
std::string("Expected a list in the execute return, found type '") +
835-
str + "'.");
836-
}
837-
838-
responses = responses_obj;
839-
size_t response_size = py::len(responses);
840-
841-
// If the number of request objects do not match the number of
842-
// response objects throw an error.
843-
if (response_size != batch_size) {
844-
std::string err =
845-
"Number of InferenceResponse objects do not match the number "
846-
"of "
847-
"InferenceRequest objects. InferenceRequest(s) size is:" +
848-
std::to_string(batch_size) + ", and InferenceResponse(s) size is:" +
849-
std::to_string(response_size) + "\n";
850-
throw PythonBackendException(err);
851-
}
852-
853-
for (size_t i = 0; i < response_size; i++) {
854-
// Check the return type of execute function.
855-
InferRequest* infer_request = py_request_list[i].cast<InferRequest*>();
856-
if (infer_request->ReleaseFlags() ==
857-
TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) {
858-
if (!py::isinstance<py::none>(responses[i])) {
859-
// When the request is rescheduled in non-decoupled model, the
860-
// response must be None.
861-
std::string str = py::str(responses[i].get_type());
862-
throw PythonBackendException(
863-
"Expected a None object in the execute function return list for "
864-
"reschduled request, "
865-
"found type '" +
866-
str + "'.");
867-
}
868-
} else {
869-
if (!py::isinstance<InferResponse>(responses[i])) {
870-
std::string str = py::str(responses[i].get_type());
871-
throw PythonBackendException(
872-
std::string(
873-
"Expected an 'InferenceResponse' object in the execute "
874-
"function return list, found type '") +
875-
str + "'.");
876-
}
877-
InferResponse* infer_response = responses[i].cast<InferResponse*>();
878-
infer_response->PruneOutputTensors(
879-
infer_request->RequestedOutputNames());
880-
ProcessResponse(infer_response);
881-
responses_shm_handle[i] = infer_response->ShmHandle();
882-
}
883-
}
884-
response_batch_shm_ptr->batch_size = response_size;
750+
// Return if there is nothing to process.
751+
if (py::isinstance<py::none>(py_responses_obj)) {
752+
return;
885753
}
886-
catch (const PythonBackendException& pb_exception) {
887-
has_exception = true;
888-
error_string = pb_exception.what();
754+
// Only non-decoupled may return responses.
755+
if (IsDecoupled()) {
756+
throw PythonBackendException(
757+
"Python model '" + name_ +
758+
"' is using the decoupled mode and the execute function must return "
759+
"None.");
889760
}
890-
catch (const py::error_already_set& error) {
891-
has_exception = true;
892-
error_string = error.what();
761+
// Check responses is a list.
762+
if (!py::isinstance<py::list>(py_responses_obj)) {
763+
throw PythonBackendException(
764+
"Expected a list in the execute return, found type '" +
765+
std::string(py::str(py_responses_obj.get_type())) + "'.");
766+
}
767+
py::list py_responses = py_responses_obj;
768+
// Responses and requests length must match.
769+
size_t requests_size = py::len(py_requests);
770+
size_t responses_size = py::len(py_responses);
771+
if (requests_size != responses_size) {
772+
throw PythonBackendException(
773+
"Number of InferenceResponse objects do not match the number of "
774+
"InferenceRequest objects. InferenceRequest(s) size is:" +
775+
std::to_string(requests_size) + ", and InferenceResponse(s) size is:" +
776+
std::to_string(responses_size) + "\n");
893777
}
894778

895-
if (has_exception) {
896-
std::string err_message =
897-
std::string(
898-
"Failed to process the request(s) for model '" + name_ +
899-
"', message: ") +
900-
error_string;
901-
error_string_shm = PbString::Create(shm_pool_, error_string);
902-
response_batch_shm_ptr->has_error = true;
903-
response_batch_shm_ptr->is_error_set = true;
904-
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
779+
for (size_t i = 0; i < responses_size; i++) {
780+
if (!py::isinstance<py::none>(py_responses[i])) {
781+
InferRequest* request = py_requests[i].cast<InferRequest*>();
782+
// Response must be None if rescheduled.
783+
if (request->ReleaseFlags() == TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) {
784+
throw PythonBackendException(
785+
"Expected a None object in the execute function return list for "
786+
"reschduled request, found type '" +
787+
std::string(py::str(py_responses[i].get_type())) + "'.");
788+
}
789+
// Send the response.
790+
if (!py::isinstance<InferResponse>(py_responses[i])) {
791+
throw PythonBackendException(
792+
"Expected an 'InferenceResponse' object in the execute function "
793+
"return list, found type '" +
794+
std::string(py::str(py_responses[i].get_type())) + "'.");
795+
}
796+
std::shared_ptr<InferResponse> response =
797+
py_responses[i].cast<std::shared_ptr<InferResponse>>();
798+
request->GetResponseSender()->Send(
799+
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
800+
}
905801
}
906802
}
907803

@@ -923,15 +819,19 @@ Stub::GetAsyncEventLoop()
923819
return async_event_loop_;
924820
}
925821

926-
void
927-
Stub::RunCoroutine(py::object coroutine)
822+
py::object
823+
Stub::RunCoroutine(py::object coroutine, bool in_background)
928824
{
929825
py::object loop = GetAsyncEventLoop();
930826
py::object py_future = py::module_::import("asyncio").attr(
931827
"run_coroutine_threadsafe")(coroutine, loop);
932-
py_future.attr("add_done_callback")(
933-
py::module_::import("c_python_backend_utils")
934-
.attr("async_event_future_done_callback"));
828+
if (in_background) {
829+
py_future.attr("add_done_callback")(
830+
py::module_::import("c_python_backend_utils")
831+
.attr("async_event_future_done_callback"));
832+
return py::none();
833+
}
834+
return py_future.attr("result")();
935835
}
936836

937837
void

src/pb_stub.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,20 +253,19 @@ class Stub {
253253
/// Execute a batch of requests.
254254
void ProcessRequests(RequestBatch* request_batch_shm_ptr);
255255

256-
void ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr);
256+
void ProcessReturnedResponses(
257+
py::list py_requests, py::object py_responses_obj);
257258

258259
py::object GetAsyncEventLoop();
259260

260-
void RunCoroutine(py::object coroutine);
261+
py::object RunCoroutine(py::object coroutine, bool in_background);
261262

262263
/// Get the memory manager message queue
263264
std::unique_ptr<MessageQueue<uint64_t>>& MemoryManagerQueue();
264265

265266
/// Get the shared memory pool
266267
std::unique_ptr<SharedMemoryManager>& ShmPool() { return shm_pool_; }
267268

268-
void ProcessResponse(InferResponse* response);
269-
270269
void ProcessBLSResponseDecoupled(std::unique_ptr<IPCMessage>& ipc_message);
271270

272271
void LoadGPUBuffers(std::unique_ptr<IPCMessage>& ipc_message);

0 commit comments

Comments
 (0)