Skip to content

Run several single thread operators parellel #850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 94 additions & 7 deletions ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -2739,9 +2739,10 @@ struct ggml_context_container {
//

enum ggml_task_type {
GGML_TASK_INIT = 0,
GGML_TASK_COMPUTE,
GGML_TASK_FINALIZE,
GGML_TASK_UNKNOWN = 0,
GGML_TASK_INIT = 1,
GGML_TASK_COMPUTE = 2,
GGML_TASK_FINALIZE = 4,
};

struct ggml_compute_params {
Expand Down Expand Up @@ -9291,14 +9292,26 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
break;
}

int type = state->params.type;
if (state->node) {
if (state->params.ith < state->params.nth) {
ggml_compute_forward(&state->params, state->node);

if (type & GGML_TASK_INIT)
{
state->params.type = GGML_TASK_INIT;
ggml_compute_forward(&state->params, state->node);
}

if (type & GGML_TASK_COMPUTE)
{
state->params.type = GGML_TASK_COMPUTE;
ggml_compute_forward(&state->params, state->node);
}
}

state->node = NULL;
} else {
break;
continue;
}
}

Expand Down Expand Up @@ -9556,6 +9569,11 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)

struct ggml_tensor * node = cgraph->nodes[i];

// this node is caculated already
if (node->n_tasks == 0) {
continue;
}

// TODO: this could be used to avoid unnecessary computations, but it needs to be improved
//if (node->grad == NULL && node->perf_runs > 0) {
// continue;
Expand All @@ -9575,6 +9593,8 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)

ggml_compute_forward(&params, node);

int dispath_threads = 0;

// COMPUTE
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
Expand Down Expand Up @@ -9606,13 +9626,79 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
}

atomic_store(&state_shared.has_work, true);

} else {
int start = i;
int end = i + 1;
while (end < cgraph->n_nodes && dispath_threads < n_threads && (end - start) < 4)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

magic number 4 needs some tuning.

{
struct ggml_tensor * next = cgraph->nodes[end];
end++;

if (next->n_tasks != 1)
continue;

// check src depedency
bool is_dep = false;
for (int k = start; k < end; k++)
{
struct ggml_tensor * node = cgraph->nodes[k];
if (next->src0 == node || next->src1 == node)
{
is_dep = true;
break;
}
}

if (is_dep)
continue;

workers[dispath_threads].params = (struct ggml_compute_params) {
.type = GGML_TASK_COMPUTE | GGML_TASK_INIT,
.ith = 0,
.nth = 1,
.wsize = 0, // has to be 0 for single thread op
.wdata = NULL, // has to be NULL for single thread op
};
workers[dispath_threads].node = next;

next->n_tasks = 0; // indicate this node is caculated
dispath_threads++;
}

if (dispath_threads > 0)
{
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}

while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}

for (int j = dispath_threads; j < n_threads - 1; j++)
{
workers[j].node = NULL;
}

atomic_fetch_sub(&state_shared.n_ready, 1);

while (atomic_load(&state_shared.n_ready) > 0)
{
ggml_lock_lock(&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}

atomic_store(&state_shared.has_work, true);
}
}

params.type = GGML_TASK_COMPUTE;
ggml_compute_forward(&params, node);

// wait for thread pool
if (node->n_tasks > 1) {
if (node->n_tasks > 1 || dispath_threads > 0) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
Expand All @@ -9630,6 +9716,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
}
}

#if 0
// FINALIZE
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
Expand Down Expand Up @@ -9684,7 +9771,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
ggml_lock_unlock(&state_shared.spin);
}
}

#endif
// performance stats (node)
{
int64_t perf_cycles_cur = ggml_perf_cycles() - perf_node_start_cycles;
Expand Down