Skip to content

Commit 10aafa7

Browse files
committed
Response sender to check for improper non-decoupled model usage
1 parent 4551e04 commit 10aafa7

File tree

6 files changed

+85
-32
lines changed

6 files changed

+85
-32
lines changed

src/infer_request.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ InferRequest::InferRequest(
7474
pb_cancel_ =
7575
std::make_shared<PbCancel>(response_factory_address_, request_address_);
7676
response_sender_ = std::make_shared<ResponseSender>(
77-
request_address_, response_factory_address_,
77+
request_address_, response_factory_address_, nullptr /* is_decoupled */,
7878
Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_);
7979
#endif
8080
}
@@ -272,7 +272,8 @@ InferRequest::SaveToSharedMemory(std::unique_ptr<SharedMemoryManager>& shm_pool)
272272
std::unique_ptr<InferRequest>
273273
InferRequest::LoadFromSharedMemory(
274274
std::unique_ptr<SharedMemoryManager>& shm_pool,
275-
bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle)
275+
bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle,
276+
bool const* is_model_decoupled)
276277
{
277278
AllocatedSharedMemory<char> infer_request_shm =
278279
shm_pool->Load<char>(request_handle);
@@ -328,7 +329,7 @@ InferRequest::LoadFromSharedMemory(
328329
return std::unique_ptr<InferRequest>(new InferRequest(
329330
infer_request_shm, request_id_shm, correlation_id_shm,
330331
requested_output_names_shm, model_name_shm, input_tensors, parameters_shm,
331-
infer_trace_shm));
332+
infer_trace_shm, is_model_decoupled));
332333
}
333334

334335
InferRequest::InferRequest(
@@ -339,7 +340,8 @@ InferRequest::InferRequest(
339340
std::unique_ptr<PbString>& model_name_shm,
340341
std::vector<std::shared_ptr<PbTensor>>& input_tensors,
341342
std::unique_ptr<PbString>& parameters_shm,
342-
std::unique_ptr<InferenceTrace>& infer_trace_shm)
343+
std::unique_ptr<InferenceTrace>& infer_trace_shm,
344+
bool const* is_model_decoupled)
343345
: infer_request_shm_(std::move(infer_request_shm)),
344346
request_id_shm_(std::move(request_id_shm)),
345347
requested_output_names_shm_(std::move(requested_output_names_shm)),
@@ -387,7 +389,7 @@ InferRequest::InferRequest(
387389
pb_cancel_ =
388390
std::make_shared<PbCancel>(response_factory_address_, request_address_);
389391
response_sender_ = std::make_shared<ResponseSender>(
390-
request_address_, response_factory_address_,
392+
request_address_, response_factory_address_, is_model_decoupled,
391393
Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_);
392394
#endif
393395
}

src/infer_request.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class InferRequest {
118118
static std::unique_ptr<InferRequest> LoadFromSharedMemory(
119119
std::unique_ptr<SharedMemoryManager>& shm_pool,
120120
bi::managed_external_buffer::handle_t request_handle,
121-
bool open_cuda_handle);
121+
bool open_cuda_handle, bool const* is_model_decoupled);
122122

123123
/// Disallow copying the inference request object.
124124
DISALLOW_COPY_AND_ASSIGN(InferRequest);
@@ -135,7 +135,8 @@ class InferRequest {
135135
std::unique_ptr<PbString>& model_name_shm,
136136
std::vector<std::shared_ptr<PbTensor>>& input_tensors,
137137
std::unique_ptr<PbString>& parameters_shm,
138-
std::unique_ptr<InferenceTrace>& infer_trace_shm);
138+
std::unique_ptr<InferenceTrace>& infer_trace_shm,
139+
bool const* is_model_decoupled);
139140

140141
std::string request_id_;
141142
CorrelationId correlation_id_;

src/pb_stub.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,8 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr)
658658
for (size_t i = 0; i < batch_size; i++) {
659659
std::shared_ptr<InferRequest> infer_request =
660660
InferRequest::LoadFromSharedMemory(
661-
shm_pool_, request_shm_handle[i], true /* open_cuda_handle */);
661+
shm_pool_, request_shm_handle[i], true /* open_cuda_handle */,
662+
&ipc_control_->decoupled /* is_model_decoupled */);
662663
py_request_list.append(infer_request);
663664
}
664665

src/python_be.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,8 @@ ModelInstanceState::ExecuteBLSRequest(
571571
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
572572
request_batch.data_.get() + sizeof(RequestBatch));
573573
infer_request = InferRequest::LoadFromSharedMemory(
574-
Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */);
574+
Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */,
575+
nullptr /* is_model_decoupled */);
575576

576577
// If the BLS inputs are in GPU an additional round trip between the
577578
// stub process and the main process is required. The reason is that we

src/response_sender.cc

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,31 @@
3535

3636
namespace triton { namespace backend { namespace python {
3737

38+
void
39+
AssertResponseSenderArgumentsWellFormed(
40+
const std::shared_ptr<InferResponse>& response, const uint32_t flags)
41+
{
42+
// Check the correctness of the provided flags.
43+
if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) {
44+
throw PythonBackendException(
45+
"Unable to send response. Unsupported flag provided.");
46+
}
47+
48+
if (flags == 0 && response == nullptr) {
49+
throw PythonBackendException(
50+
"Inference Response object must be provided when the response flags is "
51+
"set to zero.");
52+
}
53+
}
54+
3855
ResponseSender::ResponseSender(
3956
intptr_t request_address, intptr_t response_factory_address,
40-
std::unique_ptr<SharedMemoryManager>& shm_pool,
57+
bool const* is_decoupled, std::unique_ptr<SharedMemoryManager>& shm_pool,
4158
const std::shared_ptr<PbCancel>& pb_cancel)
4259
: request_address_(request_address),
43-
response_factory_address_(response_factory_address), shm_pool_(shm_pool),
44-
closed_(false), pb_cancel_(pb_cancel)
60+
response_factory_address_(response_factory_address),
61+
is_decoupled_(is_decoupled), shm_pool_(shm_pool), pb_cancel_(pb_cancel),
62+
closed_(false), number_of_response_sent_(0)
4563
{
4664
}
4765

@@ -54,15 +72,32 @@ ResponseSender::~ResponseSender()
5472
}
5573

5674
void
57-
ResponseSender::Send(
58-
std::shared_ptr<InferResponse> infer_response, const uint32_t flags)
75+
ResponseSender::UpdateStateAndCounters(
76+
const std::shared_ptr<InferResponse>& response, const uint32_t flags)
5977
{
60-
// Release the GIL. This avoids a potential deadlock situation in the parent
61-
// process, where every thread in the thread pool is indirectly waiting for a
62-
// function in the stub process that acquires the GIL. Meanwhile, the current
63-
// thread, which holds the GIL, is also waiting for the parent side to have
64-
// the next available thread to pick up the job during resource contention.
65-
py::gil_scoped_release release;
78+
if (is_decoupled_ == nullptr) {
79+
// TODO: Can a model access the response sender on a BLS infer request?
80+
throw PythonBackendException(
81+
"Unable to send response. Response sender has no reference to the "
82+
"decoupled state of the model.");
83+
}
84+
bool is_decoupled = *is_decoupled_;
85+
86+
std::lock_guard<std::mutex> lk(mu_);
87+
88+
if (!is_decoupled) {
89+
if (response != nullptr && number_of_response_sent_ > 0) {
90+
throw PythonBackendException(
91+
"Unable to send response. Non-decoupled model cannot send more than "
92+
"one response.");
93+
}
94+
if (response == nullptr && flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL &&
95+
number_of_response_sent_ == 0) {
96+
throw PythonBackendException(
97+
"Unable to send response. Non-decoupled model cannot send complete "
98+
"final before sending a response.");
99+
}
100+
}
66101

67102
if (closed_) {
68103
throw PythonBackendException(
@@ -72,18 +107,22 @@ ResponseSender::Send(
72107
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
73108
closed_ = true;
74109
}
110+
number_of_response_sent_++;
111+
}
75112

76-
// Check the correctness of the provided flags.
77-
if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) {
78-
throw PythonBackendException(
79-
"Unable to send response. Unsupported flag provided.");
80-
}
113+
void
114+
ResponseSender::Send(
115+
std::shared_ptr<InferResponse> infer_response, const uint32_t flags)
116+
{
117+
// Release the GIL. This avoids a potential deadlock situation in the parent
118+
// process, where every thread in the thread pool is indirectly waiting for a
119+
// function in the stub process that acquires the GIL. Meanwhile, the current
120+
// thread, which holds the GIL, is also waiting for the parent side to have
121+
// the next available thread to pick up the job during resource contention.
122+
py::gil_scoped_release release;
81123

82-
if (flags == 0 && infer_response == nullptr) {
83-
throw PythonBackendException(
84-
"Inference Response object must be provided when the response flags is "
85-
"set to zero.");
86-
}
124+
AssertResponseSenderArgumentsWellFormed(infer_response, flags);
125+
UpdateStateAndCounters(infer_response, flags);
87126

88127
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
89128

src/response_sender.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
#pragma once
2828

29+
#include <mutex>
30+
2931
#include "infer_response.h"
3032
#include "pb_cancel.h"
3133
#include "shm_manager.h"
@@ -36,17 +38,24 @@ class ResponseSender {
3638
public:
3739
ResponseSender(
3840
intptr_t request_address, intptr_t response_factory_address,
39-
std::unique_ptr<SharedMemoryManager>& shm_pool,
41+
bool const* is_decoupled, std::unique_ptr<SharedMemoryManager>& shm_pool,
4042
const std::shared_ptr<PbCancel>& pb_cancel);
4143
~ResponseSender();
4244
void Send(std::shared_ptr<InferResponse> response, const uint32_t flags);
4345
bool IsCancelled();
4446

4547
private:
48+
void UpdateStateAndCounters(
49+
const std::shared_ptr<InferResponse>& response, const uint32_t flags);
50+
4651
intptr_t request_address_;
4752
intptr_t response_factory_address_;
53+
bool const* is_decoupled_;
4854
std::unique_ptr<SharedMemoryManager>& shm_pool_;
49-
bool closed_;
5055
std::shared_ptr<PbCancel> pb_cancel_;
56+
57+
std::mutex mu_;
58+
bool closed_;
59+
size_t number_of_response_sent_;
5160
};
5261
}}} // namespace triton::backend::python

0 commit comments

Comments
 (0)