Skip to content

Commit f4c83f2

Browse files
committed
Add back 24.05 response sender path
1 parent d84bb57 commit f4c83f2

File tree

4 files changed

+269
-24
lines changed

4 files changed

+269
-24
lines changed

src/pb_stub.cc

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -657,23 +657,16 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
657657
IPCMessage::Create(shm_pool_, false /* Inline response */);
658658
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
659659

660-
AllocatedSharedMemory<ResponseBatch> response_batch =
661-
shm_pool_->Construct<ResponseBatch>();
662-
ResponseBatch* response_batch_shm_ptr =
663-
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
664-
execute_response->Args() = response_batch.handle_;
660+
std::optional<AllocatedSharedMemory<char>> response_batch;
665661
bool has_exception = false;
666662
std::string error_string;
667663
std::unique_ptr<PbString> error_string_shm;
668664

669665
ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); });
670666
ScopedDefer _(
671667
[this, &execute_response] { SendIPCMessage(execute_response); });
672-
668+
py::object execute_return;
673669
try {
674-
response_batch_shm_ptr->has_error = false;
675-
response_batch_shm_ptr->is_error_set = false;
676-
677670
if (!py::hasattr(model_instance_, "execute")) {
678671
std::string message = "Python model " + model_context_.PythonModelPath() +
679672
" does not implement `execute` method.";
@@ -683,7 +676,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
683676
{
684677
NVTX_RANGE(nvtx_, "PyExecute " + name_);
685678

686-
py::object execute_return =
679+
execute_return =
687680
model_instance_.attr("execute")(py_request_list);
688681

689682
bool is_coroutine = py::module::import("asyncio")
@@ -696,10 +689,10 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
696689
} else {
697690
py::object coroutine_return =
698691
RunCoroutine(execute_return, false /* in_background */);
699-
ProcessReturnedResponses(py_request_list, coroutine_return);
692+
ProcessReturnedResponses(py_request_list, coroutine_return, response_batch);
700693
}
701694
} else {
702-
ProcessReturnedResponses(py_request_list, execute_return);
695+
ProcessReturnedResponses(py_request_list, execute_return, response_batch);
703696
}
704697
}
705698
}
@@ -719,6 +712,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
719712
"', message: ") +
720713
error_string;
721714
LOG_ERROR << err_message.c_str();
715+
if (!response_batch) {
716+
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch));
717+
}
718+
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
719+
720+
response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
722721
response_batch_shm_ptr->has_error = true;
723722
error_string_shm = PbString::Create(shm_pool_, err_message);
724723
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
@@ -732,11 +731,35 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
732731
request->GetResponseSender()->Close();
733732
}
734733
}
734+
735+
if (!response_batch) {
736+
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch));
737+
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
738+
response_batch_shm_ptr->batch_size = 0;
739+
}
740+
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
741+
response_batch_shm_ptr->has_error = false;
742+
response_batch_shm_ptr->is_error_set = false;
743+
execute_response->Args() = response_batch.value().handle_;
744+
_.Complete();
745+
execute_finalize.Complete();
746+
}
747+
748+
void
749+
Stub::ProcessResponse(InferResponse* response)
750+
{
751+
response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */);
752+
753+
for (auto& output_tensor : response->OutputTensors()) {
754+
if (!output_tensor->IsCPU()) {
755+
gpu_tensors_.push_back(output_tensor);
756+
}
757+
}
735758
}
736759

737760
void
738761
Stub::ProcessReturnedResponses(
739-
py::list py_requests, py::object py_responses_obj)
762+
py::list py_requests, py::object py_responses_obj, std::optional<AllocatedSharedMemory<char>>& response_batch)
740763
{
741764
// Return if there is nothing to process.
742765
if (py::isinstance<py::none>(py_responses_obj)) {
@@ -784,12 +807,32 @@ Stub::ProcessReturnedResponses(
784807
"return list, found type '" +
785808
std::string(py::str(py_responses[i].get_type())) + "'.");
786809
}
810+
787811
std::shared_ptr<InferResponse> response =
788812
py_responses[i].cast<std::shared_ptr<InferResponse>>();
789-
request->GetResponseSender()->Send(
790-
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
813+
request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
791814
}
792815
}
816+
response_batch = std::move(shm_pool_->Construct<char>(
817+
requests_size * sizeof(bi::managed_external_buffer::handle_t) +
818+
sizeof(ResponseBatch)));
819+
ResponseBatch* response_batch_shm_ptr =
820+
reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
821+
822+
bi::managed_external_buffer::handle_t* responses_shm_handle =
823+
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
824+
response_batch.value().data_.get() + sizeof(ResponseBatch));
825+
826+
for (size_t i = 0; i < responses_size; i++) {
827+
// Check the return type of execute function.
828+
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
829+
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
830+
infer_response->PruneOutputTensors(
831+
infer_request->RequestedOutputNames());
832+
ProcessResponse(infer_response);
833+
responses_shm_handle[i] = infer_response->ShmHandle();
834+
}
835+
response_batch_shm_ptr->batch_size = requests_size;
793836
}
794837

795838
py::object

src/pb_stub.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,9 @@ class Stub {
254254
void ProcessRequests(RequestBatch* request_batch_shm_ptr);
255255

256256
void ProcessReturnedResponses(
257-
py::list py_requests, py::object py_responses_obj);
257+
py::list py_requests, py::object py_responses_obj, std::optional<AllocatedSharedMemory<char>>& response_batch);
258+
259+
void ProcessResponse(InferResponse* response);
258260

259261
py::object GetAsyncEventLoop();
260262

src/python_be.cc

Lines changed: 206 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,7 +1229,7 @@ ModelInstanceState::ProcessRequests(
12291229
ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest;
12301230
ipc_message->Args() = request_batch.handle_;
12311231
received_message_ = nullptr;
1232-
ScopedDefer _([this] {
1232+
ScopedDefer execute_finalize([this] {
12331233
// Push a dummy message to signal the thread to terminate.
12341234
Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE);
12351235
});
@@ -1240,19 +1240,23 @@ ModelInstanceState::ProcessRequests(
12401240
cv_.wait(guard, [this] { return received_message_ != nullptr; });
12411241
}
12421242

1243-
AllocatedSharedMemory<ResponseBatch> response_batch =
1244-
Stub()->ShmPool()->Load<ResponseBatch>(received_message_->Args());
1243+
1244+
AllocatedSharedMemory<char> response_batch = Stub()->ShmPool()->Load<char>(received_message_->Args());
1245+
1246+
ResponseBatch* response_batch_shm_ptr =
1247+
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
1248+
12451249
received_message_.reset();
12461250

12471251
uint64_t compute_end_ns = 0;
12481252
SET_TIMESTAMP(compute_end_ns);
12491253
reporter.SetComputeEndNs(compute_end_ns);
12501254
reporter.SetBatchStatistics(total_batch_size);
12511255

1252-
if (response_batch.data_->has_error) {
1253-
if (response_batch.data_->is_error_set) {
1256+
if (response_batch_shm_ptr->has_error) {
1257+
if (response_batch_shm_ptr->is_error_set) {
12541258
auto error = PbString::LoadFromSharedMemory(
1255-
Stub()->ShmPool(), response_batch.data_->error);
1259+
Stub()->ShmPool(), response_batch_shm_ptr->error);
12561260
return TRITONSERVER_ErrorNew(
12571261
TRITONSERVER_ERROR_INTERNAL, error->String().c_str());
12581262
}
@@ -1261,6 +1265,202 @@ ModelInstanceState::ProcessRequests(
12611265
TRITONSERVER_ERROR_INTERNAL, "Failed to process the requests.");
12621266
}
12631267

1268+
if (response_batch_shm_ptr->batch_size > 0) {
1269+
std::shared_ptr<std::vector<TRITONBACKEND_Response*>> responses(
1270+
new std::vector<TRITONBACKEND_Response*>());
1271+
responses->reserve(request_count);
1272+
for (size_t i = 0; i < request_count; i++) {
1273+
TRITONBACKEND_Response* response;
1274+
auto err = TRITONBACKEND_ResponseNew(&response, requests[i]);
1275+
if (err == nullptr) {
1276+
responses->emplace_back(response);
1277+
} else {
1278+
responses->emplace_back(nullptr);
1279+
LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response");
1280+
TRITONSERVER_ErrorDelete(err);
1281+
}
1282+
}
1283+
bi::managed_external_buffer::handle_t* response_shm_handle =
1284+
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
1285+
response_batch.data_.get() + sizeof(ResponseBatch));
1286+
1287+
// If the output provided by the model is in GPU, we will pass the list of
1288+
// buffers provided by Triton to the stub process.
1289+
// bool has_gpu_output = false;
1290+
std::vector<bool> requires_deferred_callback;
1291+
1292+
std::vector<std::unique_ptr<InferResponse>> shm_responses;
1293+
std::vector<std::vector<std::pair<std::unique_ptr<PbMemory>, void*>>>
1294+
gpu_output_buffers(request_count);
1295+
GPUBuffersHelper gpu_buffer_helper;
1296+
1297+
for (uint32_t r = 0; r < request_count; ++r) {
1298+
NVTX_RANGE(nvtx_, "LoadingResponse " + Name());
1299+
TRITONBACKEND_Response* response = (*responses)[r];
1300+
TRITONBACKEND_Request* request = requests[r];
1301+
uint32_t requested_output_count = 0;
1302+
requires_deferred_callback.push_back(false);
1303+
1304+
shm_responses.emplace_back(nullptr);
1305+
std::unique_ptr<InferResponse>& infer_response = shm_responses.back();
1306+
try {
1307+
if (pb_infer_requests[r]->ReleaseFlags() ==
1308+
TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) {
1309+
// For rescheduled requests, we do not need to send a response.
1310+
LOG_IF_ERROR(
1311+
TRITONBACKEND_ResponseDelete((*responses)[r]),
1312+
"failed to delete response");
1313+
(*responses)[r] = nullptr;
1314+
continue;
1315+
}
1316+
infer_response = InferResponse::LoadFromSharedMemory(
1317+
Stub()->ShmPool(), response_shm_handle[r],
1318+
false /* open_cuda_handle */);
1319+
if (infer_response->HasError()) {
1320+
TRITONSERVER_Error* err = TRITONSERVER_ErrorNew(
1321+
infer_response->Error()->Code(),
1322+
infer_response->Error()->Message().c_str());
1323+
1324+
LOG_IF_ERROR(
1325+
TRITONBACKEND_ResponseSend(
1326+
(*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err),
1327+
"failed sending response");
1328+
TRITONSERVER_ErrorDelete(err);
1329+
(*responses)[r] = nullptr;
1330+
1331+
// Reset the release flags for the request.
1332+
pb_infer_requests[r]->SetReleaseFlags(
1333+
TRITONSERVER_REQUEST_RELEASE_ALL);
1334+
1335+
// If has_error is true, we do not look at the response tensors.
1336+
continue;
1337+
}
1338+
}
1339+
catch (const PythonBackendException& pb_exception) {
1340+
TRITONSERVER_Error* err = TRITONSERVER_ErrorNew(
1341+
TRITONSERVER_ERROR_INTERNAL, pb_exception.what());
1342+
LOG_IF_ERROR(
1343+
TRITONBACKEND_ResponseSend(
1344+
(*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err),
1345+
"failed sending response");
1346+
TRITONSERVER_ErrorDelete(err);
1347+
(*responses)[r] = nullptr;
1348+
1349+
// Reset the release flags for the request.
1350+
pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL);
1351+
1352+
continue;
1353+
}
1354+
1355+
GUARDED_RESPOND_IF_ERROR(
1356+
responses, r,
1357+
TRITONBACKEND_RequestOutputCount(request, &requested_output_count));
1358+
1359+
std::set<std::string> requested_output_names;
1360+
for (size_t j = 0; j < requested_output_count; ++j) {
1361+
const char* output_name;
1362+
GUARDED_RESPOND_IF_ERROR(
1363+
responses, r,
1364+
TRITONBACKEND_RequestOutputName(request, j, &output_name));
1365+
requested_output_names.insert(output_name);
1366+
}
1367+
1368+
bool require_deferred_callback = false;
1369+
1370+
#ifdef TRITON_ENABLE_GPU
1371+
for (auto& output_tensor : infer_response->OutputTensors()) {
1372+
if (output_tensor->MemoryType() == TRITONSERVER_MEMORY_GPU) {
1373+
// Attempt to use the cuda shared memory pool for GPU tensor.
1374+
ShareCUDAMemoryPool(output_tensor->MemoryTypeId());
1375+
}
1376+
}
1377+
#endif // TRITON_ENABLE_GPU
1378+
1379+
gpu_output_buffers[r] =
1380+
std::vector<std::pair<std::unique_ptr<PbMemory>, void*>>{};
1381+
infer_response->Send(
1382+
response, CudaStream(), require_deferred_callback,
1383+
TRITONSERVER_RESPONSE_COMPLETE_FINAL, Stub()->ShmPool(),
1384+
gpu_buffer_helper, gpu_output_buffers[r], requested_output_names);
1385+
1386+
requires_deferred_callback[r] = require_deferred_callback;
1387+
1388+
if (requires_deferred_callback[r]) {
1389+
// has_gpu_output = true;
1390+
}
1391+
}
1392+
1393+
// Finalize the execute.
1394+
execute_finalize.Complete();
1395+
}
1396+
1397+
// If the output tensor is in GPU, there will be a second round trip
1398+
// required for filling the GPU buffers provided by the main process.
1399+
// if (has_gpu_output) {
1400+
// ipc_message->Command() =
1401+
// PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers;
1402+
// gpu_buffer_helper.Complete(Stub()->ShmPool());
1403+
// ipc_message->Args() = gpu_buffer_helper.ShmHandle();
1404+
// SendMessageAndReceiveResponse(
1405+
// ipc_message->ShmHandle(), response_message, restart, responses,
1406+
// requests, 0);
1407+
1408+
// bool cuda_copy = false;
1409+
1410+
// uint32_t response_index = 0;
1411+
// for (auto& gpu_output_buffer : gpu_output_buffers) {
1412+
// for (auto& buffer_memory_pair : gpu_output_buffer) {
1413+
// auto& pb_memory = buffer_memory_pair.first;
1414+
// void* pointer = buffer_memory_pair.second;
1415+
// bool cuda_used = false;
1416+
1417+
// if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) {
1418+
// GUARDED_RESPOND_IF_ERROR(
1419+
// responses, response_index,
1420+
// CopyBuffer(
1421+
// "Failed to copy the output tensor to buffer.",
1422+
// TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0,
1423+
// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1424+
// CudaStream(), &cuda_used));
1425+
// cuda_copy |= cuda_used;
1426+
// } else if (
1427+
// (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) &&
1428+
// pb_memory->UseCUDASharedPool() &&
1429+
// (pb_memory->DataPtr() != pointer)) {
1430+
// // If the data pointer from pb_memory is not the same as the
1431+
// // pointer, it means that the Triton-provided buffer is not used
1432+
// // during tensor transfer. Instead, an intermediate buffer that uses
1433+
// // CUDA shared memory pool is used. In this case, we need to copy
1434+
// // the data from the intermediate buffer back to the Triton-provided
1435+
// // buffer.
1436+
// GUARDED_RESPOND_IF_ERROR(
1437+
// responses, response_index,
1438+
// CopyBuffer(
1439+
// "Failed to copy the output tensor to buffer.",
1440+
// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1441+
// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1442+
// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1443+
// CudaStream(), &cuda_used));
1444+
// cuda_copy |= cuda_used;
1445+
// }
1446+
// }
1447+
// response_index++;
1448+
// #ifdef TRITON_ENABLE_GPU
1449+
// if (cuda_copy) {
1450+
// cudaStreamSynchronize(stream_);
1451+
// }
1452+
// #endif // TRITON_ENABLE_GPU
1453+
// }
1454+
// }
1455+
1456+
// bls_defer.Complete();
1457+
// for (uint32_t r = 0; r < request_count; ++r) {
1458+
// if (requires_deferred_callback[r]) {
1459+
// shm_responses[r]->DeferredSendCallback();
1460+
// }
1461+
// }
1462+
// }
1463+
12641464
return nullptr; // success
12651465
}
12661466

0 commit comments

Comments
 (0)