@@ -1433,6 +1433,10 @@ struct server_queue {
1433
1433
} else {
1434
1434
queue_tasks.push_back (std::move (task));
1435
1435
}
1436
+ // if this is cancel task make sure to clean up pending tasks
1437
+ if (task.type == SERVER_TASK_TYPE_CANCEL) {
1438
+ cleanup_pending_task (task.id_target );
1439
+ }
1436
1440
condition_tasks.notify_one ();
1437
1441
return task.id ;
1438
1442
}
@@ -1450,6 +1454,10 @@ struct server_queue {
1450
1454
} else {
1451
1455
queue_tasks.push_back (std::move (task));
1452
1456
}
1457
+ // if this is cancel task make sure to clean up pending tasks
1458
+ if (task.type == SERVER_TASK_TYPE_CANCEL) {
1459
+ cleanup_pending_task (task.id_target );
1460
+ }
1453
1461
}
1454
1462
condition_tasks.notify_one ();
1455
1463
return 0 ;
@@ -1544,6 +1552,20 @@ struct server_queue {
1544
1552
}
1545
1553
}
1546
1554
}
1555
+
1556
+ private:
1557
+ void cleanup_pending_task (int id_task) {
1558
+ // no need lock because this is called exclusively by post()
1559
+ auto rm_func = [id_task](const server_task & task) {
1560
+ return task.id_target == id_task;
1561
+ };
1562
+ queue_tasks.erase (
1563
+ std::remove_if (queue_tasks.begin (), queue_tasks.end (), rm_func),
1564
+ queue_tasks.end ());
1565
+ queue_tasks_deferred.erase (
1566
+ std::remove_if (queue_tasks_deferred.begin (), queue_tasks_deferred.end (), rm_func),
1567
+ queue_tasks_deferred.end ());
1568
+ }
1547
1569
};
1548
1570
1549
1571
struct server_response {
@@ -1579,6 +1601,12 @@ struct server_response {
1579
1601
1580
1602
std::unique_lock<std::mutex> lock (mutex_results);
1581
1603
waiting_task_ids.erase (id_task);
1604
+ // make sure to clean up all pending results
1605
+ queue_results.erase (
1606
+ std::remove_if (queue_results.begin (), queue_results.end (), [id_task](const server_task_result_ptr & res) {
1607
+ return res->id == id_task;
1608
+ }),
1609
+ queue_results.end ());
1582
1610
}
1583
1611
1584
1612
void remove_waiting_task_ids (const std::unordered_set<int > & id_tasks) {
@@ -1598,7 +1626,7 @@ struct server_response {
1598
1626
return !queue_results.empty ();
1599
1627
});
1600
1628
1601
- for (int i = 0 ; i < ( int ) queue_results.size (); i++) {
1629
+ for (size_t i = 0 ; i < queue_results.size (); i++) {
1602
1630
if (id_tasks.find (queue_results[i]->id ) != id_tasks.end ()) {
1603
1631
server_task_result_ptr res = std::move (queue_results[i]);
1604
1632
queue_results.erase (queue_results.begin () + i);
@@ -1615,12 +1643,6 @@ struct server_response {
1615
1643
server_task_result_ptr recv_with_timeout (const std::unordered_set<int > & id_tasks, int timeout) {
1616
1644
while (true ) {
1617
1645
std::unique_lock<std::mutex> lock (mutex_results);
1618
- bool cr_res = condition_results.wait_for (lock, std::chrono::seconds (timeout), [&]{
1619
- return !queue_results.empty ();
1620
- });
1621
- if (!cr_res) {
1622
- return nullptr ;
1623
- }
1624
1646
1625
1647
for (int i = 0 ; i < (int ) queue_results.size (); i++) {
1626
1648
if (id_tasks.find (queue_results[i]->id ) != id_tasks.end ()) {
@@ -1629,6 +1651,11 @@ struct server_response {
1629
1651
return res;
1630
1652
}
1631
1653
}
1654
+
1655
+ std::cv_status cr_res = condition_results.wait_for (lock, std::chrono::seconds (timeout));
1656
+ if (cr_res == std::cv_status::timeout) {
1657
+ return nullptr ;
1658
+ }
1632
1659
}
1633
1660
1634
1661
// should never reach here
@@ -2376,8 +2403,8 @@ struct server_context {
2376
2403
2377
2404
server_task task (SERVER_TASK_TYPE_CANCEL);
2378
2405
task.id_target = id_task;
2379
- cancel_tasks.push_back (task);
2380
2406
queue_results.remove_waiting_task_id (id_task);
2407
+ cancel_tasks.push_back (task);
2381
2408
}
2382
2409
// push to beginning of the queue, so it has highest priority
2383
2410
queue_tasks.post (cancel_tasks, true );
0 commit comments