Skip to content

Commit 094eeca

Browse files
authored
Merge pull request #17 from akyrtzi/global-queue-for-unit-register
[Index] Use a single global serial queue for registration of units
2 parents 31ff487 + 6a51621 commit 094eeca

File tree

1 file changed

+71
-20
lines changed

1 file changed

+71
-20
lines changed

lib/Index/IndexDatastore.cpp

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,18 @@ static sys::TimePoint<> toTimePoint(timespec ts) {
6161

6262
const static dispatch_qos_class_t unitChangesQOS = QOS_CLASS_UTILITY;
6363

64+
/// Returns a global serial queue for unit processing.
65+
/// This is useful to avoid doing a lot of parallel CPU and I/O work when opening multiple workspaces.
66+
static dispatch_queue_t getGlobalQueueForUnitChanges() {
67+
static dispatch_queue_t queueForUnitChanges;
68+
static dispatch_once_t onceToken = 0;
69+
dispatch_once(&onceToken, ^{
70+
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, unitChangesQOS, 0);
71+
queueForUnitChanges = dispatch_queue_create("IndexStoreDB.store.unit.processing", qosAttribute);
72+
});
73+
return queueForUnitChanges;
74+
}
75+
6476
namespace {
6577

6678
class UnitMonitor;
@@ -86,8 +98,6 @@ class StoreUnitRepo : public std::enable_shared_from_this<StoreUnitRepo> {
8698
std::shared_ptr<DoneInitState> DoneInitializingPtr;
8799
dispatch_semaphore_t InitSemaphore;
88100

89-
dispatch_queue_t queueForUnitChanges;
90-
91101
mutable llvm::sys::Mutex StateMtx;
92102
std::unordered_map<IDCode, std::shared_ptr<UnitMonitor>> UnitMonitorsByCode;
93103

@@ -102,16 +112,11 @@ class StoreUnitRepo : public std::enable_shared_from_this<StoreUnitRepo> {
102112

103113
DoneInitializingPtr = std::make_shared<DoneInitState>();
104114
InitSemaphore = dispatch_semaphore_create(0);
105-
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, unitChangesQOS, 0);
106-
queueForUnitChanges = dispatch_queue_create("IndexStoreDB.store.unit.processing", qosAttribute);
107115
}
108116
~StoreUnitRepo() {
109117
dispatch_release(InitSemaphore);
110-
dispatch_release(queueForUnitChanges);
111118
}
112119

113-
dispatch_queue_t getQueueForUnitChanges() const { return queueForUnitChanges; }
114-
115120
void onFilesChange(std::vector<UnitEventInfo> evts,
116121
function_ref<void(unsigned)> ReportCompleted,
117122
function_ref<void()> DirectoryDeleted);
@@ -691,6 +696,56 @@ sys::TimePoint<> UnitMonitor::getModTimeForOutOfDateCheck(StringRef filePath) {
691696
// IndexDatastoreImpl
692697
//===----------------------------------------------------------------------===//
693698

699+
namespace {
700+
/// A thread-safe deque object for UnitEventInfo objects.
701+
class UnitEventInfoDeque {
702+
std::deque<UnitEventInfo> EventsDequeue;
703+
mutable llvm::sys::Mutex StateMtx;
704+
705+
public:
706+
void addEvents(ArrayRef<UnitEventInfo> evts) {
707+
sys::ScopedLock L(StateMtx);
708+
EventsDequeue.insert(EventsDequeue.end(), evts.begin(), evts.end());
709+
}
710+
711+
Optional<UnitEventInfo> popFront() {
712+
sys::ScopedLock L(StateMtx);
713+
if (EventsDequeue.empty())
714+
return None;
715+
UnitEventInfo evt = EventsDequeue.front();
716+
EventsDequeue.pop_front();
717+
return evt;
718+
}
719+
};
720+
}
721+
722+
/// Enqueues asynchronous processing of the unit events in an incremental fashion.
723+
/// Events are queued-up individually and the next event is enqueued only after
724+
/// the current one has been processed.
725+
static void processUnitEventsIncrementally(std::shared_ptr<UnitEventInfoDeque> evts,
726+
std::weak_ptr<StoreUnitRepo> weakUnitRepo,
727+
std::shared_ptr<IndexSystemDelegate> delegate,
728+
dispatch_queue_t queue) {
729+
Optional<UnitEventInfo> evtOpt = evts->popFront();
730+
if (!evtOpt.hasValue())
731+
return;
732+
auto UnitRepo = weakUnitRepo.lock();
733+
if (!UnitRepo)
734+
return;
735+
736+
UnitEventInfo evt = evtOpt.getValue();
737+
UnitRepo->onFilesChange({evt}, [&](unsigned NumCompleted){
738+
delegate->processingCompleted(NumCompleted);
739+
}, [&](){
740+
// FIXME: the database should recover.
741+
});
742+
743+
// Enqueue processing the rest of the events.
744+
dispatch_async(queue, ^{
745+
processUnitEventsIncrementally(evts, weakUnitRepo, delegate, queue);
746+
});
747+
}
748+
694749
bool IndexDatastoreImpl::init(IndexStoreRef idxStore,
695750
SymbolIndexRef SymIndex,
696751
std::shared_ptr<IndexSystemDelegate> Delegate,
@@ -706,28 +761,24 @@ bool IndexDatastoreImpl::init(IndexStoreRef idxStore,
706761

707762
auto UnitRepo = std::make_shared<StoreUnitRepo>(this->IdxStore, SymIndex, Delegate, CanonPathCache);
708763
std::weak_ptr<StoreUnitRepo> WeakUnitRepo = UnitRepo;
709-
auto OnUnitsChange = [WeakUnitRepo, Delegate](IndexStore::UnitEventNotification EventNote) {
710-
auto UnitRepo = WeakUnitRepo.lock();
711-
if (!UnitRepo)
712-
return;
713-
764+
auto eventsDeque = std::make_shared<UnitEventInfoDeque>();
765+
auto OnUnitsChange = [WeakUnitRepo, Delegate, eventsDeque](IndexStore::UnitEventNotification EventNote) {
714766
std::vector<UnitEventInfo> evts;
715767
for (size_t i = 0, e = EventNote.getEventsCount(); i != e; ++i) {
716768
auto evt = EventNote.getEvent(i);
717769
evts.push_back(UnitEventInfo{evt.getKind(), evt.getUnitName()});
718770
}
719771

772+
Delegate->processingAddedPending(evts.size());
773+
eventsDeque->addEvents(evts);
774+
720775
// Create the block with QoS explicitly to ensure that the QoS from the indexstore callback can't affect the onFilesChange priority. This call may do a lot of I/O and we don't want to wedge the system by running at elevated priority.
721776
dispatch_block_t onUnitChangeBlock = dispatch_block_create_with_qos_class(DISPATCH_BLOCK_INHERIT_QOS_CLASS, unitChangesQOS, 0, ^{
722-
Delegate->processingAddedPending(evts.size());
723-
UnitRepo->onFilesChange(std::move(evts), [&](unsigned NumCompleted){
724-
Delegate->processingCompleted(NumCompleted);
725-
}, [&](){
726-
// FIXME: the database should recover.
727-
});
777+
// Pass registration events to be processed incrementally by the global serial queue.
778+
// This allows intermixing processing of registration events from multiple workspaces.
779+
processUnitEventsIncrementally(eventsDeque, WeakUnitRepo, Delegate, getGlobalQueueForUnitChanges());
728780
});
729-
730-
dispatch_async(UnitRepo->getQueueForUnitChanges(), onUnitChangeBlock);
781+
dispatch_async(getGlobalQueueForUnitChanges(), onUnitChangeBlock);
731782
Block_release(onUnitChangeBlock);
732783
};
733784

0 commit comments

Comments
 (0)