Skip to content

Stage and Source Scheduler and Grouped Execution

Wenlei Xie edited this page Jul 27, 2019 · 9 revisions

Background

When Presto executes a query, it does so by breaking up the execution into a hierarchy of stages. Stage models a particular part of a distributed query plan. Each stage reads from data source and writes to an output buffer, and there are two types of sources:

  • Table scan source (sometimes referred to as partitioned source in code)
  • Remote source (sometimes referred to as unpartitioned source in code)

The following figure shows part of a query plan with two stages:

Two Stages

See more details about stage at https://prestodb.github.io/docs/current/overview/concepts.html#stage

Stage Scheduler

StageScheduler is responsible for the following jobs:

  • Create tasks for this stage
  • Schedule table splits to tasks

This section will discuss about stage scheduler before grouped execution is introduced. ScaledWriterScheduler is not discussed here.

The StageScheduler interface is quite concise (https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/StageScheduler.java):

public interface StageScheduler
        extends Closeable
{
    /**
     * Schedules as much work as possible without blocking.
     * The schedule results is a hint to the query scheduler if and
     * when the stage scheduler should be invoked again.  It is
     * important to note that this is only a hint and the query
     * scheduler may call the schedule method at any time.
     */
    ScheduleResult schedule();

    @Override
    default void close() {}
}

ScheduleResult contains the following information (see https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ScheduleResult.java#L36-L40):

Set<RemoteTask> newTasks;
ListenableFuture<?> blocked;
Optional<BlockedReason> blockedReason;
boolean finished;
int splitsScheduled;

Prior to grouped execution (introduced in https://github.com/prestodb/presto/pull/8951), there are two block reasons:

In this section, we are going to introduce the three main stage schedulers prior to grouped execution (https://github.com/prestodb/presto/pull/8951).

FixedCountScheduler

This is used when stage has node partitioning, and all the split sources are remote. A typical case is performing join or aggregate over unbucketed table. This is the simplest stage scheduler, all tasks are created in one time and there will be no further split scheduling (since all splits are RemoteSplit): https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedCountScheduler.java#L55-L65

FixedSourcePartitionedScheduler

This is used when stage has node partitioning, and at least one split source is from table scan. This usually happens to stages reading from bucketed tables.

For each table scan split source, an SourcePartitionedScheduler is created to help schedule the splits. We denote these SourcePartitionedScheduler as s_1, s_2, …, s_k and the FixedSourcePartitionedScheduler as f. Note these s_i share the same SqlStageExecution with f -- so each s_i is not a “independent” stage scheduler, but somewhat used as a “source scheduler” for f.

s_i.schedule() are called in order (from builder to probe side). s_{i+1}.schedule() will be called only after s_i finished schedule (but not execution!), as demonstrated by the following simplified code: (the actual code can be found at https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L90-L105) :

while (!sourceSchedulers.isEmpty()) {
    ScheduleResult schedule = sourceSchedulers.peek().schedule();
    if (schedule.isDone()) {
        sourceSchedulers.remove();
    }
    else {
        break;
    }
}

All the tasks are eagerly created when first time schedule() is called. https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L78-L85

An interesting note is both the following cases will use FixedSourcePartitionedScheduler:

  • Colocated join (i.e. join two bucketed tables).
  • Bucketed table join unbucketed table. In this case, remote exchange will be added to the unbucketed table side to adopt the same bucketing.

SourcePartitionedScheduler

This is used when the stage partitioning is SOURCE_DISTRIBUTION. In other words:

  • It doesn’t have node partitioning.
  • It doesn’t contain partitioned remote source (replicated remote source is fine). Usually, this is used for the leaf stages contains table scan over unbucketed table.

The main workflow for SourcePartitionedScheduler#schedule (https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SourcePartitionedScheduler.java#L84-L147) is as follows:

  • Get the splits to be scheduled (stored in pendingSplits). splitSource#getNextBatch will be called if necessary.
  • Decide the node assignments of these splits (by calling splitPlacementPolicy#computeAssignments). There are two different split placement policy:
    • DynamicSplitPlacementPolicy: splits are from unbucketed table. It will be assigned based on worker load and locality.
    • FixedSplitPlacementPolicy (now renamed to BucketedSplitPlacementPolicy): splits are from bucketed table. It will be assigned based on NodePartitioningMap. Note for this case, SourcePartitionedScheduler is actually used as a “source scheduler” of a FixedSourcePartitionedScheduler -- since the when SourcePartitionedScheduler is used as a “stage scheduler”, the stage partitioning has to be SOURCE_DISTRIBUTION thus the input table cannot be bucketed.
  • Create new tasks if necessary, etc.
    • Note this only makes a difference when SourcePartitionedScheduler is used as a stage scheduler rather than source scheduler.
    • When SourcePartitionedScheduler is used as a source scheduler, newTasks returned in ScheduleResult will simply be ignored.

Discussion

Even before grouped execution is introduced, there are some hacks introduced, especially FixedSourcePartitionedScheduler can use SourcePartitionedScheduler as a “source scheduler”, and some of the code in SourcePartitionedScheduler are only useful as a stage scheduler, which is confusing.

Some latent bugs exist back to the old days, and only get fixed after amplified by grouped execution, such as https://github.com/prestodb/presto/issues/11253

To Be Continued....

Clone this wiki locally