Skip to content

NFC prep work for sorting jobs by priority #40191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/swift/ABI/MetadataValues.h
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,12 @@ enum class JobPriority : size_t {
Unspecified = 0x00,
};

/// A tri-valued comparator which orders higher priorities first.
inline int descendingPriorityOrder(JobPriority lhs,
JobPriority rhs) {
return (lhs == rhs ? 0 : lhs > rhs ? -1 : 1);
}

/// Flags for task creation.
class TaskCreateFlags : public FlagSet<size_t> {
public:
Expand Down
93 changes: 93 additions & 0 deletions include/swift/Basic/ListMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,99 @@ class ListMerger {
setLastInsertionPoint(newNode, /*known last of equals*/ true);
}

/// Add a single node to this merger's current list.
///
/// The next reference of the node will be overwritten and does not
/// need to be meaningful.
///
/// The relative order of nodes in the current list will not change,
/// and if there are nodes in the current list which compare equal
/// to the new node, it will be inserted *before* them.
///
/// This is useful for the pattern where nodes are naturally encountered
/// in the opposite of their desired order in the final list and
/// need to be reversed. It generally doesn't make any sense to mix
/// this with calls to insert or merge on the same merger.
void insertAtFront(Node newNode) {
assert(newNode && "inserting a null node");

auto insertBetween = [newNode, this](Node prev, Node next) {
if (prev) {
assert(NodeTraits::getNext(prev) == next);
assert(NodeTraits::compare(prev, newNode) < 0);
NodeTraits::setNext(prev, newNode);
} else {
assert(root == next);
root = newNode;
}

assert(!next || NodeTraits::compare(newNode, next) <= 0);
NodeTraits::setNext(newNode, next);
setLastInsertionPoint(prev, /*known last of equals*/ true);
};

Node prev = Node();
Node cur = root;

// If we have a previous insertion point, check for the presumed-common
// case that we're inserting something that should immediately follow it.
if (auto lastIP = lastInsertionPoint) {
lastIP = findLastOfEqualsFromLastIP(lastIP);

// Compare against the next node after lastIP, if it exists.
if (Node nextAfterLastIP = NodeTraits::getNext(lastIP)) {
int comparison = NodeTraits::compare(nextAfterLastIP, newNode);

// If the new node compares equal to the next node, insert here.
if (comparison == 0) {
insertBetween(lastIP, nextAfterLastIP);
return;
}

// If the new node should follow the next node, start scanning
// after it.
if (comparison < 0) {
prev = nextAfterLastIP;
cur = NodeTraits::getNext(nextAfterLastIP);
}

// Otherwise, we'll need to scan from the beginning.

// If there is no next node, compare against the previous.
} else {
int comparison = NodeTraits::compare(lastIP, newNode);

// If the new node should follow the last node, we can
// insert here.
if (comparison < 0) {
insertBetween(lastIP, Node());
return;
}

// Otherwise, we'll need to scan from the beginning.
}
}

assert(!prev || NodeTraits::compare(prev, newNode) < 0);

// Scan forward, looking for a node which the new node must be
// inserted prior to.
// Invariant: prev < newNode, if prev exists
while (cur) {
// Compare the new node against the current IP.
int comparison = NodeTraits::compare(cur, newNode);

// If the new node isn't strictly greater than cur, insert here.
if (comparison >= 0) break;

// Otherwise, continue.
prev = cur;
cur = NodeTraits::getNext(prev);
}

insertBetween(prev, cur);
}

/// Add a sorted list of nodes to this merger's current list.
/// The list must be well-formed (i.e. appropriately terminated).
///
Expand Down
141 changes: 141 additions & 0 deletions stdlib/public/Concurrency/CooperativeGlobalExecutor.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
///===--- CooperativeGlobalExecutor.inc ---------------------*- C++ -*--===///
///
/// This source file is part of the Swift.org open source project
///
/// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
/// Licensed under Apache License v2.0 with Runtime Library Exception
///
/// See https:///swift.org/LICENSE.txt for license information
/// See https:///swift.org/CONTRIBUTORS.txt for the list of Swift project authors
///
///===------------------------------------------------------------------===///
///
/// The implementation of the cooperative global executor.
///
/// This file is included into GlobalExecutor.cpp only when
/// the cooperative global executor is enabled. It is expected to
/// declare the following functions:
/// swift_task_enqueueGlobalImpl
/// swift_task_enqueueGlobalWithDelayImpl
/// swift_task_enqueueMainExecutorImpl
/// as well as any cooperative-executor-specific functions in the runtime.
///
///===------------------------------------------------------------------===///

#include <chrono>
#include <thread>

static Job *JobQueue = nullptr;

class DelayedJob {
public:
Job *job;
unsigned long long when;
DelayedJob *next;

DelayedJob(Job *job, unsigned long long when) : job(job), when(when), next(nullptr) {}
};

static DelayedJob *DelayedJobQueue = nullptr;

/// Get the next-in-queue storage slot.
static Job *&nextInQueue(Job *cur) {
return reinterpret_cast<Job*&>(cur->SchedulerPrivate[Job::NextWaitingTaskIndex]);
}

/// Insert a job into the cooperative global queue.
SWIFT_CC(swift)
static void swift_task_enqueueGlobalImpl(Job *job) {
assert(job && "no job provided");

Job **position = &JobQueue;
while (auto cur = *position) {
// If we find a job with lower priority, insert here.
if (cur->getPriority() < newJob->getPriority()) {
nextInQueue(newJob) = cur;
*position = newJob;
return;
}

// Otherwise, keep advancing through the queue.
position = &nextInQueue(cur);
}
nextInQueue(newJob) = nullptr;
*position = newJob;
}

/// Enqueues a task on the main executor.
SWIFT_CC(swift)
static void swift_task_enqueueMainExecutorImpl(Job *job) {
// The cooperative executor does not distinguish between the main
// queue and the global queue.
swift_task_enqueueGlobalImpl(job);
}

static unsigned long long currentNanos() {
auto now = std::chrono::steady_clock::now();
auto nowNanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
auto value = std::chrono::duration_cast<std::chrono::nanoseconds>(nowNanos.time_since_epoch());
return value.count();
}

/// Insert a job into the cooperative global queue with a delay.
SWIFT_CC(swift)
static void swift_task_enqueueGlobalWithDelayImpl(unsigned long long delay,
Job *job) {
assert(job && "no job provided");

DelayedJob **position = &DelayedJobQueue;
DelayedJob *newJob = new DelayedJob(job, currentNanos() + delay);

while (auto cur = *position) {
// If we find a job with lower priority, insert here.
if (cur->when > newJob->when) {
newJob->next = cur;
*position = newJob;
return;
}

// Otherwise, keep advancing through the queue.
position = &cur->next;
}
*position = newJob;
}

/// Claim the next job from the cooperative global queue.
static Job *claimNextFromCooperativeGlobalQueue() {
// Check delayed jobs first
while (true) {
if (auto delayedJob = DelayedJobQueue) {
if (delayedJob->when < currentNanos()) {
DelayedJobQueue = delayedJob->next;
auto job = delayedJob->job;

delete delayedJob;

return job;
}
}
if (auto job = JobQueue) {
JobQueue = nextInQueue(job);
return job;
}
// there are only delayed jobs left, but they are not ready,
// so we sleep until the first one is
if (auto delayedJob = DelayedJobQueue) {
std::this_thread::sleep_for(std::chrono::nanoseconds(delayedJob->when - currentNanos()));
continue;
}
return nullptr;
}
}

void swift::
swift_task_donateThreadToGlobalExecutorUntil(bool (*condition)(void *),
void *conditionContext) {
while (!condition(conditionContext)) {
auto job = claimNextFromCooperativeGlobalQueue();
if (!job) return;
swift_job_run(job, ExecutorRef::generic());
}
}
Loading