Skip to content

Commit b36b55d

Browse files
committed
Improve perf
1 parent f4c83f2 commit b36b55d

File tree

6 files changed

+76
-75
lines changed

6 files changed

+76
-75
lines changed

src/infer_request.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled)
484484
{
485485
bi::scoped_lock<bi::interprocess_mutex> lock{
486486
*(ipc_message->ResponseMutex())};
487-
stub->SendIPCMessage(ipc_message);
487+
stub->SendIPCUtilsMessage(ipc_message);
488488
ipc_message->ResponseCondition()->wait(lock);
489489
}
490490

src/ipc_message.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,19 @@ IPCMessage::Create(
5656
new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm));
5757
}
5858

59+
std::unique_ptr<IPCMessage>
60+
IPCMessage::Create(IPCMessageShm* ipc_message_shm,
61+
bi::managed_external_buffer::handle_t& message_handle)
62+
{
63+
return std::unique_ptr<IPCMessage>(new IPCMessage(ipc_message_shm, message_handle));
64+
}
65+
66+
AllocatedSharedMemory<IPCMessageShm>&
67+
IPCMessage::GetAllocatedSharedMemory()
68+
{
69+
return ipc_message_shm_;
70+
}
71+
5972
std::unique_ptr<IPCMessage>
6073
IPCMessage::LoadFromSharedMemory(
6174
std::unique_ptr<SharedMemoryManager>& shm_pool,
@@ -133,4 +146,10 @@ IPCMessage::IPCMessage(
133146
ipc_message_handle_ = ipc_message_shm_.handle_;
134147
}
135148

149+
IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle)
150+
{
151+
ipc_message_handle_ = handle;
152+
ipc_message_shm_ptr_ = ipc_message_shm;
153+
}
154+
136155
}}}; // namespace triton::backend::python

src/ipc_message.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ class IPCMessage {
9797
static std::unique_ptr<IPCMessage> Create(
9898
const std::unique_ptr<SharedMemoryManager>& shm_pool,
9999
bool inline_response);
100+
101+
static std::unique_ptr<IPCMessage>
102+
Create(IPCMessageShm* ipc_message_shm,
103+
bi::managed_external_buffer::handle_t& message_handle);
100104
static std::unique_ptr<IPCMessage> LoadFromSharedMemory(
101105
std::unique_ptr<SharedMemoryManager>& shm_pool,
102106
bi::managed_external_buffer::handle_t message_handle);
@@ -108,6 +112,7 @@ class IPCMessage {
108112
bi::interprocess_mutex* ResponseMutex();
109113
bi::managed_external_buffer::handle_t& Args();
110114
bi::managed_external_buffer::handle_t ShmHandle();
115+
AllocatedSharedMemory<IPCMessageShm>& GetAllocatedSharedMemory();
111116

112117
private:
113118
AllocatedSharedMemory<IPCMessageShm> ipc_message_shm_;
@@ -129,6 +134,8 @@ class IPCMessage {
129134
AllocatedSharedMemory<IPCMessageShm>& ipc_message_shm,
130135
AllocatedSharedMemory<bi::interprocess_mutex>& response_mutex_shm,
131136
AllocatedSharedMemory<bi::interprocess_condition>& response_cond_shm);
137+
138+
IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle);
132139
};
133140

134141
}}}; // namespace triton::backend::python

src/pb_stub.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -653,9 +653,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
653653
{
654654
py::list py_request_list =
655655
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
656-
std::unique_ptr<IPCMessage> execute_response =
657-
IPCMessage::Create(shm_pool_, false /* Inline response */);
658-
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
656+
std::unique_ptr<IPCMessage> execute_response;
657+
// IPCMessage::Create(shm_pool_, false /* Inline response */);
659658

660659
std::optional<AllocatedSharedMemory<char>> response_batch;
661660
bool has_exception = false;
@@ -713,9 +712,9 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
713712
error_string;
714713
LOG_ERROR << err_message.c_str();
715714
if (!response_batch) {
716-
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch));
715+
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch) + sizeof(IPCMessageShm));
717716
}
718-
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
717+
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
719718

720719
response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
721720
response_batch_shm_ptr->has_error = true;
@@ -733,14 +732,17 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
733732
}
734733

735734
if (!response_batch) {
736-
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch));
737-
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
735+
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch) + sizeof(IPCMessageShm));
736+
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
738737
response_batch_shm_ptr->batch_size = 0;
739738
}
740-
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
739+
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
741740
response_batch_shm_ptr->has_error = false;
742741
response_batch_shm_ptr->is_error_set = false;
742+
execute_response = IPCMessage::Create(reinterpret_cast<IPCMessageShm*>(response_batch.value().data_.get()), response_batch.value().handle_);
743743
execute_response->Args() = response_batch.value().handle_;
744+
execute_response->InlineResponse() = false;
745+
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
744746
_.Complete();
745747
execute_finalize.Complete();
746748
}
@@ -813,15 +815,15 @@ Stub::ProcessReturnedResponses(
813815
request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
814816
}
815817
}
816-
response_batch = std::move(shm_pool_->Construct<char>(
818+
response_batch = std::move(shm_pool_->Construct<char>(sizeof(IPCMessageShm) +
817819
requests_size * sizeof(bi::managed_external_buffer::handle_t) +
818820
sizeof(ResponseBatch)));
819821
ResponseBatch* response_batch_shm_ptr =
820-
reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
822+
reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
821823

822824
bi::managed_external_buffer::handle_t* responses_shm_handle =
823825
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
824-
response_batch.value().data_.get() + sizeof(ResponseBatch));
826+
response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm));
825827

826828
for (size_t i = 0; i < responses_size; i++) {
827829
// Check the return type of execute function.

src/python_be.cc

Lines changed: 29 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,8 @@ ModelInstanceState::SaveRequestsToSharedMemory(
290290
request, &request_timeout));
291291

292292
std::unique_ptr<InferRequest> infer_request;
293-
TRITONBACKEND_ResponseFactory* factory_ptr;
294-
RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));
293+
TRITONBACKEND_ResponseFactory* factory_ptr = nullptr;
294+
// RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));
295295

296296
infer_request = std::make_unique<InferRequest>(
297297
id, correlation_id, pb_input_tensors, requested_output_names,
@@ -322,8 +322,6 @@ ModelInstanceState::LaunchStubProcess()
322322
thread_pool_ = std::make_unique<boost::asio::thread_pool>(
323323
model_state->StateForBackend()->thread_pool_size);
324324

325-
queue_monitor_thread_ = true;
326-
queue_monitor_ = std::thread(&ModelInstanceState::MessageQueueMonitor, this);
327325
request_executor_ = std::make_unique<RequestExecutor>(
328326
Stub()->ShmPool(), model_state->TritonServer());
329327

@@ -685,44 +683,6 @@ ModelInstanceState::ExecuteBLSRequest(
685683
}
686684
}
687685

688-
void
689-
ModelInstanceState::MessageQueueMonitor()
690-
{
691-
while (queue_monitor_thread_) {
692-
bi::managed_external_buffer::handle_t handle =
693-
Stub()->ParentMessageQueue()->Pop();
694-
if (handle == DUMMY_MESSAGE) {
695-
break;
696-
}
697-
std::unique_ptr<IPCMessage> message =
698-
IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), handle);
699-
700-
// Need to notify the model instance thread that the execute response has
701-
// been received.
702-
if (message->Command() == PYTHONSTUB_ExecuteResponse) {
703-
std::lock_guard<std::mutex> guard{mu_};
704-
received_message_ = std::move(message);
705-
cv_.notify_one();
706-
} else if (message->Command() == PYTHONSTUB_ResponseSend) {
707-
std::shared_ptr<IPCMessage> response_send_message = std::move(message);
708-
std::packaged_task<void()> task([this, response_send_message] {
709-
ResponseSendDecoupled(response_send_message);
710-
});
711-
boost::asio::post(*thread_pool_, std::move(task));
712-
} else if (
713-
message->Command() == PYTHONSTUB_InferExecRequest ||
714-
message->Command() == PYTHONSTUB_InferStreamExecRequest) {
715-
std::shared_ptr<IPCMessage> bls_execute = std::move(message);
716-
std::packaged_task<void()> task([this, bls_execute] {
717-
ExecuteBLSRequest(
718-
bls_execute,
719-
(bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest));
720-
});
721-
boost::asio::post(*thread_pool_, std::move(task));
722-
}
723-
}
724-
}
725-
726686
void
727687
ModelInstanceState::StubToParentMQMonitor()
728688
{
@@ -769,6 +729,25 @@ ModelInstanceState::StubToParentMQMonitor()
769729
ProcessModelControlRequest(message);
770730
break;
771731
}
732+
case PYTHONSTUB_ResponseSend: {
733+
std::shared_ptr<IPCMessage> response_send_message = std::move(message);
734+
std::packaged_task<void()> task([this, response_send_message] {
735+
ResponseSendDecoupled(response_send_message);
736+
});
737+
boost::asio::post(*thread_pool_, std::move(task));
738+
break;
739+
}
740+
case PYTHONSTUB_InferExecRequest:
741+
case PYTHONSTUB_InferStreamExecRequest: {
742+
std::shared_ptr<IPCMessage> bls_execute = std::move(message);
743+
std::packaged_task<void()> task([this, bls_execute] {
744+
ExecuteBLSRequest(
745+
bls_execute,
746+
(bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest));
747+
});
748+
boost::asio::post(*thread_pool_, std::move(task));
749+
break;
750+
}
772751
default: {
773752
LOG_MESSAGE(
774753
TRITONSERVER_LOG_ERROR, "Unexpected message type received.");
@@ -1228,26 +1207,23 @@ ModelInstanceState::ProcessRequests(
12281207
IPCMessage::Create(Stub()->ShmPool(), false /*inline_response*/));
12291208
ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest;
12301209
ipc_message->Args() = request_batch.handle_;
1231-
received_message_ = nullptr;
1210+
12321211
ScopedDefer execute_finalize([this] {
12331212
// Push a dummy message to signal the thread to terminate.
12341213
Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE);
12351214
});
12361215

1216+
std::unique_ptr<IPCMessage> response;
12371217
{
1238-
std::unique_lock<std::mutex> guard{mu_};
12391218
Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle());
1240-
cv_.wait(guard, [this] { return received_message_ != nullptr; });
1219+
bi::managed_external_buffer::handle_t response_message;
1220+
Stub()->ReceiveMessageFromStub(response_message);
1221+
response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message);
12411222
}
1242-
1243-
1244-
AllocatedSharedMemory<char> response_batch = Stub()->ShmPool()->Load<char>(received_message_->Args());
1245-
1223+
char* ipc_message_shm = reinterpret_cast<char*>(response->GetAllocatedSharedMemory().data_.get());;
12461224
ResponseBatch* response_batch_shm_ptr =
1247-
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
1225+
reinterpret_cast<ResponseBatch*>(ipc_message_shm + sizeof(IPCMessageShm));
12481226

1249-
received_message_.reset();
1250-
12511227
uint64_t compute_end_ns = 0;
12521228
SET_TIMESTAMP(compute_end_ns);
12531229
reporter.SetComputeEndNs(compute_end_ns);
@@ -1282,7 +1258,7 @@ ModelInstanceState::ProcessRequests(
12821258
}
12831259
bi::managed_external_buffer::handle_t* response_shm_handle =
12841260
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
1285-
response_batch.data_.get() + sizeof(ResponseBatch));
1261+
ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm));
12861262

12871263
// If the output provided by the model is in GPU, we will pass the list of
12881264
// buffers provided by Triton to the stub process.
@@ -1390,8 +1366,6 @@ ModelInstanceState::ProcessRequests(
13901366
}
13911367
}
13921368

1393-
// Finalize the execute.
1394-
execute_finalize.Complete();
13951369
}
13961370

13971371
// If the output tensor is in GPU, there will be a second round trip
@@ -1610,7 +1584,6 @@ ModelInstanceState::~ModelInstanceState()
16101584
Stub()->TerminateStub();
16111585
TerminateMonitor();
16121586
Stub()->ClearQueues();
1613-
received_message_.reset();
16141587
Stub().reset();
16151588
}
16161589

src/response_sender.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ ResponseSender::ResponseSender(
6969

7070
ResponseSender::~ResponseSender()
7171
{
72-
DeleteResponseFactory();
72+
// DeleteResponseFactory();
7373
}
7474

7575
void
@@ -172,7 +172,7 @@ ResponseSender::Send(
172172

173173
{
174174
bi::scoped_lock<bi::interprocess_mutex> guard{send_message_payload->mu};
175-
stub->SendIPCMessage(ipc_message);
175+
stub->SendIPCUtilsMessage(ipc_message);
176176
while (!send_message_payload->is_stub_turn) {
177177
send_message_payload->cv.wait(guard);
178178
}
@@ -248,7 +248,7 @@ ResponseSender::Send(
248248
}
249249

250250
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
251-
DeleteResponseFactory();
251+
// DeleteResponseFactory();
252252
}
253253
}
254254

@@ -270,10 +270,10 @@ ResponseSender::DeleteResponseFactory()
270270
{
271271
bool already_deleted = response_factory_deleted_.exchange(true);
272272
if (!already_deleted) {
273-
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
274-
stub->EnqueueCleanupId(
275-
reinterpret_cast<void*>(response_factory_address_),
276-
PYTHONSTUB_DecoupledResponseFactoryCleanup);
273+
// std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
274+
// stub->EnqueueCleanupId(
275+
// reinterpret_cast<void*>(response_factory_address_),
276+
// PYTHONSTUB_DecoupledResponseFactoryCleanup);
277277
}
278278
}
279279

0 commit comments

Comments
 (0)