Skip to content

Commit 3e9dcc5

Browse files
committed
Fix cleanup
1 parent b36b55d commit 3e9dcc5

File tree

3 files changed

+151
-82
lines changed

3 files changed

+151
-82
lines changed

src/python_be.cc

Lines changed: 128 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ ModelInstanceState::SaveRequestsToSharedMemory(
291291

292292
std::unique_ptr<InferRequest> infer_request;
293293
TRITONBACKEND_ResponseFactory* factory_ptr = nullptr;
294-
// RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));
294+
RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));
295295

296296
infer_request = std::make_unique<InferRequest>(
297297
id, correlation_id, pb_input_tensors, requested_output_names,
@@ -1009,6 +1009,62 @@ ModelInstanceState::ProcessModelControlRequest(
10091009
});
10101010
}
10111011

1012+
void
1013+
ModelInstanceState::SendMessageToStub(
1014+
bi::managed_external_buffer::handle_t message)
1015+
{
1016+
Stub()->StubMessageQueue()->Push(message);
1017+
}
1018+
1019+
void
1020+
ModelInstanceState::SendMessageAndReceiveResponse(
1021+
bi::managed_external_buffer::handle_t message,
1022+
bi::managed_external_buffer::handle_t& response,
1023+
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
1024+
TRITONBACKEND_Request** requests, const uint32_t request_count)
1025+
{
1026+
SendMessageToStub(message);
1027+
1028+
bi::managed_external_buffer::handle_t response_message;
1029+
auto error = Stub()->ReceiveMessageFromStub(response_message);
1030+
if (error != nullptr) {
1031+
RespondErrorToAllRequests(
1032+
TRITONSERVER_ErrorMessage(error), responses, requests, request_count);
1033+
1034+
return;
1035+
}
1036+
1037+
response = response_message;
1038+
}
1039+
1040+
void
1041+
ModelInstanceState::RespondErrorToAllRequests(
1042+
const char* message,
1043+
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
1044+
TRITONBACKEND_Request** requests, const uint32_t request_count)
1045+
{
1046+
for (uint32_t r = 0; r < request_count; ++r) {
1047+
if ((*responses)[r] == nullptr)
1048+
continue;
1049+
1050+
std::string err_message =
1051+
std::string(
1052+
"Failed to process the request(s) for model instance '" + Name() +
1053+
"', message: ") +
1054+
message;
1055+
1056+
TRITONSERVER_Error* err =
1057+
TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, err_message.c_str());
1058+
LOG_IF_ERROR(
1059+
TRITONBACKEND_ResponseSend(
1060+
(*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err),
1061+
"failed sending response");
1062+
1063+
(*responses)[r] = nullptr;
1064+
TRITONSERVER_ErrorDelete(err);
1065+
}
1066+
}
1067+
10121068
void
10131069
ModelInstanceState::StartMonitor()
10141070
{
@@ -1164,6 +1220,12 @@ ModelInstanceState::ResponseSendDecoupled(
11641220
SetErrorForResponseSendMessage(
11651221
send_message_payload, WrapTritonErrorInSharedPtr(error), error_message);
11661222
}
1223+
1224+
if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
1225+
std::unique_ptr<
1226+
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
1227+
lresponse_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(response_factory));
1228+
}
11671229
}
11681230

11691231
TRITONSERVER_Error*
@@ -1265,6 +1327,7 @@ ModelInstanceState::ProcessRequests(
12651327
// bool has_gpu_output = false;
12661328
std::vector<bool> requires_deferred_callback;
12671329

1330+
bool has_gpu_output = false;
12681331
std::vector<std::unique_ptr<InferResponse>> shm_responses;
12691332
std::vector<std::vector<std::pair<std::unique_ptr<PbMemory>, void*>>>
12701333
gpu_output_buffers(request_count);
@@ -1362,78 +1425,75 @@ ModelInstanceState::ProcessRequests(
13621425
requires_deferred_callback[r] = require_deferred_callback;
13631426

13641427
if (requires_deferred_callback[r]) {
1365-
// has_gpu_output = true;
1428+
has_gpu_output = true;
13661429
}
13671430
}
13681431

1369-
}
1370-
13711432
// If the output tensor is in GPU, there will be a second round trip
13721433
// required for filling the GPU buffers provided by the main process.
1373-
// if (has_gpu_output) {
1374-
// ipc_message->Command() =
1375-
// PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers;
1376-
// gpu_buffer_helper.Complete(Stub()->ShmPool());
1377-
// ipc_message->Args() = gpu_buffer_helper.ShmHandle();
1378-
// SendMessageAndReceiveResponse(
1379-
// ipc_message->ShmHandle(), response_message, restart, responses,
1380-
// requests, 0);
1381-
1382-
// bool cuda_copy = false;
1383-
1384-
// uint32_t response_index = 0;
1385-
// for (auto& gpu_output_buffer : gpu_output_buffers) {
1386-
// for (auto& buffer_memory_pair : gpu_output_buffer) {
1387-
// auto& pb_memory = buffer_memory_pair.first;
1388-
// void* pointer = buffer_memory_pair.second;
1389-
// bool cuda_used = false;
1390-
1391-
// if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) {
1392-
// GUARDED_RESPOND_IF_ERROR(
1393-
// responses, response_index,
1394-
// CopyBuffer(
1395-
// "Failed to copy the output tensor to buffer.",
1396-
// TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0,
1397-
// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1398-
// CudaStream(), &cuda_used));
1399-
// cuda_copy |= cuda_used;
1400-
// } else if (
1401-
// (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) &&
1402-
// pb_memory->UseCUDASharedPool() &&
1403-
// (pb_memory->DataPtr() != pointer)) {
1404-
// // If the data pointer from pb_memory is not the same as the
1405-
// // pointer, it means that the Triton-provided buffer is not used
1406-
// // during tensor transfer. Instead, an intermediate buffer that uses
1407-
// // CUDA shared memory pool is used. In this case, we need to copy
1408-
// // the data from the intermediate buffer back to the Triton-provided
1409-
// // buffer.
1410-
// GUARDED_RESPOND_IF_ERROR(
1411-
// responses, response_index,
1412-
// CopyBuffer(
1413-
// "Failed to copy the output tensor to buffer.",
1414-
// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1415-
// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1416-
// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1417-
// CudaStream(), &cuda_used));
1418-
// cuda_copy |= cuda_used;
1419-
// }
1420-
// }
1421-
// response_index++;
1422-
// #ifdef TRITON_ENABLE_GPU
1423-
// if (cuda_copy) {
1424-
// cudaStreamSynchronize(stream_);
1425-
// }
1426-
// #endif // TRITON_ENABLE_GPU
1427-
// }
1428-
// }
1429-
1430-
// bls_defer.Complete();
1431-
// for (uint32_t r = 0; r < request_count; ++r) {
1432-
// if (requires_deferred_callback[r]) {
1433-
// shm_responses[r]->DeferredSendCallback();
1434-
// }
1435-
// }
1436-
// }
1434+
if (has_gpu_output) {
1435+
ipc_message->Command() =
1436+
PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers;
1437+
gpu_buffer_helper.Complete(Stub()->ShmPool());
1438+
ipc_message->Args() = gpu_buffer_helper.ShmHandle();
1439+
bi::managed_external_buffer::handle_t response_message;
1440+
SendMessageAndReceiveResponse(
1441+
ipc_message->ShmHandle(), response_message, responses, requests, 0);
1442+
1443+
bool cuda_copy = false;
1444+
1445+
uint32_t response_index = 0;
1446+
for (auto& gpu_output_buffer : gpu_output_buffers) {
1447+
for (auto& buffer_memory_pair : gpu_output_buffer) {
1448+
auto& pb_memory = buffer_memory_pair.first;
1449+
void* pointer = buffer_memory_pair.second;
1450+
bool cuda_used = false;
1451+
1452+
if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) {
1453+
GUARDED_RESPOND_IF_ERROR(
1454+
responses, response_index,
1455+
CopyBuffer(
1456+
"Failed to copy the output tensor to buffer.",
1457+
TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0,
1458+
pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1459+
CudaStream(), &cuda_used));
1460+
cuda_copy |= cuda_used;
1461+
} else if (
1462+
(pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) &&
1463+
pb_memory->UseCUDASharedPool() &&
1464+
(pb_memory->DataPtr() != pointer)) {
1465+
// If the data pointer from pb_memory is not the same as the
1466+
// pointer, it means that the Triton-provided buffer is not used
1467+
// during tensor transfer. Instead, an intermediate buffer that uses
1468+
// CUDA shared memory pool is used. In this case, we need to copy
1469+
// the data from the intermediate buffer back to the Triton-provided
1470+
// buffer.
1471+
GUARDED_RESPOND_IF_ERROR(
1472+
responses, response_index,
1473+
CopyBuffer(
1474+
"Failed to copy the output tensor to buffer.",
1475+
TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1476+
TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1477+
pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1478+
CudaStream(), &cuda_used));
1479+
cuda_copy |= cuda_used;
1480+
}
1481+
}
1482+
response_index++;
1483+
#ifdef TRITON_ENABLE_GPU
1484+
if (cuda_copy) {
1485+
cudaStreamSynchronize(stream_);
1486+
}
1487+
#endif // TRITON_ENABLE_GPU
1488+
}
1489+
}
1490+
1491+
for (uint32_t r = 0; r < request_count; ++r) {
1492+
if (requires_deferred_callback[r]) {
1493+
shm_responses[r]->DeferredSendCallback();
1494+
}
1495+
}
1496+
}
14371497

14381498
return nullptr; // success
14391499
}
@@ -1575,9 +1635,6 @@ ModelInstanceState::~ModelInstanceState()
15751635
if (Stub()->IsHealthy()) {
15761636
// Wait for all the pending tasks to finish.
15771637
thread_pool_->wait();
1578-
// Push a dummy message to signal the thread to terminate.
1579-
Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE);
1580-
queue_monitor_.join();
15811638
}
15821639
// Terminate stub first to allow any last messages to be received by the back
15831640
// end before deallocating the queue memory

src/python_be.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,6 @@ class ModelInstanceState : public BackendModelInstance {
287287

288288
std::thread stub_to_parent_queue_monitor_;
289289
bool stub_to_parent_thread_;
290-
// Queue monitor thread
291-
std::thread queue_monitor_;
292-
bool queue_monitor_thread_;
293290
std::mutex mu_;
294291
std::condition_variable cv_;
295292
std::unique_ptr<IPCMessage> received_message_;
@@ -361,6 +358,20 @@ class ModelInstanceState : public BackendModelInstance {
361358
AllocatedSharedMemory<char>& request_batch,
362359
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses);
363360

361+
void SendMessageAndReceiveResponse(
362+
bi::managed_external_buffer::handle_t message,
363+
bi::managed_external_buffer::handle_t& response,
364+
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
365+
TRITONBACKEND_Request** requests, const uint32_t request_count);
366+
367+
void RespondErrorToAllRequests(
368+
const char* message,
369+
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
370+
TRITONBACKEND_Request** requests, const uint32_t request_count);
371+
372+
void SendMessageToStub(
373+
bi::managed_external_buffer::handle_t message);
374+
364375
// Model instance stub
365376
std::unique_ptr<StubLauncher>& Stub() { return model_instance_stub_; }
366377

src/response_sender.cc

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ ResponseSender::ResponseSender(
6969

7070
ResponseSender::~ResponseSender()
7171
{
72-
// DeleteResponseFactory();
72+
DeleteResponseFactory();
7373
}
7474

7575
void
@@ -172,6 +172,10 @@ ResponseSender::Send(
172172

173173
{
174174
bi::scoped_lock<bi::interprocess_mutex> guard{send_message_payload->mu};
175+
// The server will destruct the response factory if the final flag is set.
176+
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
177+
response_factory_deleted_.exchange(true);
178+
}
175179
stub->SendIPCUtilsMessage(ipc_message);
176180
while (!send_message_payload->is_stub_turn) {
177181
send_message_payload->cv.wait(guard);
@@ -247,9 +251,6 @@ ResponseSender::Send(
247251
}
248252
}
249253

250-
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
251-
// DeleteResponseFactory();
252-
}
253254
}
254255

255256
bool
@@ -270,10 +271,10 @@ ResponseSender::DeleteResponseFactory()
270271
{
271272
bool already_deleted = response_factory_deleted_.exchange(true);
272273
if (!already_deleted) {
273-
// std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
274-
// stub->EnqueueCleanupId(
275-
// reinterpret_cast<void*>(response_factory_address_),
276-
// PYTHONSTUB_DecoupledResponseFactoryCleanup);
274+
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
275+
stub->EnqueueCleanupId(
276+
reinterpret_cast<void*>(response_factory_address_),
277+
PYTHONSTUB_DecoupledResponseFactoryCleanup);
277278
}
278279
}
279280

0 commit comments

Comments
 (0)