Skip to content

[Index] Use a single global serial queue for registration of units #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 17, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 71 additions & 20 deletions lib/Index/IndexDatastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ static sys::TimePoint<> toTimePoint(timespec ts) {

const static dispatch_qos_class_t unitChangesQOS = QOS_CLASS_UTILITY;

/// Returns a global serial queue for unit processing.
/// This is useful to avoid doing a lot of parallel CPU and I/O work when opening multiple workspaces.
static dispatch_queue_t getGlobalQueueForUnitChanges() {
static dispatch_queue_t queueForUnitChanges;
static dispatch_once_t onceToken = 0;
dispatch_once(&onceToken, ^{
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, unitChangesQOS, 0);
queueForUnitChanges = dispatch_queue_create("IndexStoreDB.store.unit.processing", qosAttribute);
});
return queueForUnitChanges;
}

namespace {

class UnitMonitor;
Expand All @@ -86,8 +98,6 @@ class StoreUnitRepo : public std::enable_shared_from_this<StoreUnitRepo> {
std::shared_ptr<DoneInitState> DoneInitializingPtr;
dispatch_semaphore_t InitSemaphore;

dispatch_queue_t queueForUnitChanges;

mutable llvm::sys::Mutex StateMtx;
std::unordered_map<IDCode, std::shared_ptr<UnitMonitor>> UnitMonitorsByCode;

Expand All @@ -102,16 +112,11 @@ class StoreUnitRepo : public std::enable_shared_from_this<StoreUnitRepo> {

DoneInitializingPtr = std::make_shared<DoneInitState>();
InitSemaphore = dispatch_semaphore_create(0);
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, unitChangesQOS, 0);
queueForUnitChanges = dispatch_queue_create("IndexStoreDB.store.unit.processing", qosAttribute);
}
~StoreUnitRepo() {
dispatch_release(InitSemaphore);
dispatch_release(queueForUnitChanges);
}

dispatch_queue_t getQueueForUnitChanges() const { return queueForUnitChanges; }

void onFilesChange(std::vector<UnitEventInfo> evts,
function_ref<void(unsigned)> ReportCompleted,
function_ref<void()> DirectoryDeleted);
Expand Down Expand Up @@ -691,6 +696,56 @@ sys::TimePoint<> UnitMonitor::getModTimeForOutOfDateCheck(StringRef filePath) {
// IndexDatastoreImpl
//===----------------------------------------------------------------------===//

namespace {
/// A thread-safe deque object for UnitEventInfo objects.
class UnitEventInfoDeque {
std::deque<UnitEventInfo> EventsDequeue;
mutable llvm::sys::Mutex StateMtx;

public:
void addEvents(ArrayRef<UnitEventInfo> evts) {
sys::ScopedLock L(StateMtx);
EventsDequeue.insert(EventsDequeue.end(), evts.begin(), evts.end());
}

Optional<UnitEventInfo> popFront() {
sys::ScopedLock L(StateMtx);
if (EventsDequeue.empty())
return None;
UnitEventInfo evt = EventsDequeue.front();
EventsDequeue.pop_front();
return evt;
}
};
}

/// Enqueues asynchronous processing of the unit events in an incremental fashion.
/// Events are queued-up individually and the next event is enqueued only after
/// the current one has been processed.
static void processUnitEventsIncrementally(std::shared_ptr<UnitEventInfoDeque> evts,
std::weak_ptr<StoreUnitRepo> weakUnitRepo,
std::shared_ptr<IndexSystemDelegate> delegate,
dispatch_queue_t queue) {
Optional<UnitEventInfo> evtOpt = evts->popFront();
if (!evtOpt.hasValue())
return;
auto UnitRepo = weakUnitRepo.lock();
if (!UnitRepo)
return;

UnitEventInfo evt = evtOpt.getValue();
UnitRepo->onFilesChange({evt}, [&](unsigned NumCompleted){
delegate->processingCompleted(NumCompleted);
}, [&](){
// FIXME: the database should recover.
});

// Enqueue processing the rest of the events.
dispatch_async(queue, ^{
processUnitEventsIncrementally(evts, weakUnitRepo, delegate, queue);
});
}

bool IndexDatastoreImpl::init(IndexStoreRef idxStore,
SymbolIndexRef SymIndex,
std::shared_ptr<IndexSystemDelegate> Delegate,
Expand All @@ -706,28 +761,24 @@ bool IndexDatastoreImpl::init(IndexStoreRef idxStore,

auto UnitRepo = std::make_shared<StoreUnitRepo>(this->IdxStore, SymIndex, Delegate, CanonPathCache);
std::weak_ptr<StoreUnitRepo> WeakUnitRepo = UnitRepo;
auto OnUnitsChange = [WeakUnitRepo, Delegate](IndexStore::UnitEventNotification EventNote) {
auto UnitRepo = WeakUnitRepo.lock();
if (!UnitRepo)
return;

auto eventsDeque = std::make_shared<UnitEventInfoDeque>();
auto OnUnitsChange = [WeakUnitRepo, Delegate, eventsDeque](IndexStore::UnitEventNotification EventNote) {
std::vector<UnitEventInfo> evts;
for (size_t i = 0, e = EventNote.getEventsCount(); i != e; ++i) {
auto evt = EventNote.getEvent(i);
evts.push_back(UnitEventInfo{evt.getKind(), evt.getUnitName()});
}

Delegate->processingAddedPending(evts.size());
eventsDeque->addEvents(evts);

// Create the block with QoS explicitly to ensure that the QoS from the indexstore callback can't affect the onFilesChange priority. This call may do a lot of I/O and we don't want to wedge the system by running at elevated priority.
dispatch_block_t onUnitChangeBlock = dispatch_block_create_with_qos_class(DISPATCH_BLOCK_INHERIT_QOS_CLASS, unitChangesQOS, 0, ^{
Delegate->processingAddedPending(evts.size());
UnitRepo->onFilesChange(std::move(evts), [&](unsigned NumCompleted){
Delegate->processingCompleted(NumCompleted);
}, [&](){
// FIXME: the database should recover.
});
// Pass registration events to be processed incrementally by the global serial queue.
// This allows intermixing processing of registration events from multiple workspaces.
processUnitEventsIncrementally(eventsDeque, WeakUnitRepo, Delegate, getGlobalQueueForUnitChanges());
});

dispatch_async(UnitRepo->getQueueForUnitChanges(), onUnitChangeBlock);
dispatch_async(getGlobalQueueForUnitChanges(), onUnitChangeBlock);
Block_release(onUnitChangeBlock);
};

Expand Down