-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Stage and Source Scheduler and Grouped Execution
When Presto executes a query, it does so by breaking up the execution into a hierarchy of stages. Stage models a particular part of a distributed query plan. Each stage reads from data source and writes to an output buffer, and there are two types of sources:
- Table scan source (sometimes referred to as partitioned source in code)
- Remote source (sometimes referred to as unpartitioned source in code)
The following figure shows part of a query plan with two stages:
See more details about stage at https://prestodb.github.io/docs/current/overview/concepts.html#stage
StageScheduler is responsible for the following jobs:
- Create tasks for this stage
- Schedule table splits to tasks
This section will discuss about stage scheduler before grouped execution is introduced. ScaledWriterScheduler is not discussed here.
The StageScheduler interface is quite concise (https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/StageScheduler.java):
public interface StageScheduler
extends Closeable
{
/**
* Schedules as much work as possible without blocking.
* The schedule results is a hint to the query scheduler if and
* when the stage scheduler should be invoked again. It is
* important to note that this is only a hint and the query
* scheduler may call the schedule method at any time.
*/
ScheduleResult schedule();
@Override
default void close() {}
}
ScheduleResult
contains the following information (see https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ScheduleResult.java#L36-L40):
Set<RemoteTask> newTasks;
ListenableFuture<?> blocked;
Optional<BlockedReason> blockedReason;
boolean finished;
int splitsScheduled;
Prior to grouped execution (introduced in https://github.com/prestodb/presto/pull/8951), there are two block reasons:
-
WAITING_FOR_SOURCE
: Blocked on fetching splits from Connector (SplitSource#getNextBatch
) -
SPLIT_QUEUES_FULL
: Some splits are not assigned since the node has enough work to do. This is enforced in NodeSelector#computeAssignments (e.g. https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SimpleNodeSelector.java#L130-L134)
In this section, we are going to introduce the three main stage schedulers prior to grouped execution (https://github.com/prestodb/presto/pull/8951).
This is used when stage has node partitioning, and all the split sources are remote. A typical case is performing join or aggregate over unbucketed table. This is the simplest stage scheduler, all tasks are created in one time and there will be no further split scheduling (since all splits are RemoteSplit
): https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedCountScheduler.java#L55-L65
This is used when stage has node partitioning, and at least one split source is from table scan. This usually happens to stages reading from bucketed tables.
For each table scan split source, an SourcePartitionedScheduler
is created to help schedule the splits. We denote these SourcePartitionedScheduler
as s_1, s_2, …, s_k
and the FixedSourcePartitionedScheduler
as f
. Note these s_i
share the same SqlStageExecution
with f
-- so each s_i
is not a “independent” stage scheduler, but somewhat used as a “source scheduler” for f
.
s_i.schedule()
are called in order (from builder to probe side). s_{i+1}.schedule()
will be called only after s_i
finished schedule (but not execution!), as demonstrated by the following simplified code: (the actual code can be found at https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L90-L105) :
while (!sourceSchedulers.isEmpty()) {
ScheduleResult schedule = sourceSchedulers.peek().schedule();
if (schedule.isDone()) {
sourceSchedulers.remove();
}
else {
break;
}
}
All the tasks are eagerly created when first time schedule() is called. https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L78-L85
An interesting note is both the following cases will use FixedSourcePartitionedScheduler
:
- Colocated join (i.e. join two bucketed tables).
- Bucketed table join unbucketed table. In this case, remote exchange will be added to the unbucketed table side to adopt the same bucketing.
This is used when the stage partitioning is SOURCE_DISTRIBUTION
. In other words:
- It doesn’t have node partitioning.
- It doesn’t contain partitioned remote source (replicated remote source is fine). Usually, this is used for the leaf stages contains table scan over unbucketed table.
The main workflow for SourcePartitionedScheduler#schedule
(https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SourcePartitionedScheduler.java#L84-L147) is as follows:
- Get the splits to be scheduled (stored in
pendingSplits
).splitSource#getNextBatch
will be called if necessary. - Decide the node assignments of these splits (by calling
splitPlacementPolicy#computeAssignments
). There are two different split placement policy:-
DynamicSplitPlacementPolicy
: splits are from unbucketed table. It will be assigned based on worker load and locality. -
FixedSplitPlacementPolicy
(now renamed toBucketedSplitPlacementPolicy
): splits are from bucketed table. It will be assigned based onNodePartitioningMap
. Note for this case,SourcePartitionedScheduler
is actually used as a “source scheduler” of aFixedSourcePartitionedScheduler
-- since the whenSourcePartitionedScheduler
is used as a “stage scheduler”, the stage partitioning has to beSOURCE_DISTRIBUTION
thus the input table cannot be bucketed.
-
- Create new tasks if necessary, etc.
- Note this only makes a difference when
SourcePartitionedScheduler
is used as a stage scheduler rather than source scheduler. - When
SourcePartitionedScheduler
is used as a source scheduler,newTasks
returned inScheduleResult
will simply be ignored.
- Note this only makes a difference when
Even before grouped execution is introduced, there are some hacks introduced, especially FixedSourcePartitionedScheduler
can use SourcePartitionedScheduler
as a “source scheduler”, and some of the code in SourcePartitionedScheduler
are only useful as a stage scheduler, which is confusing.
Some latent bugs exist back to the old days, and only get fixed after amplified by grouped execution, such as https://github.com/prestodb/presto/issues/11253
To Be Continued....