-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Stage and Source Scheduler and Grouped Execution
When Presto executes a query, it does so by breaking up the execution into a hierarchy of stages. Stage models a particular part of a distributed query plan. Each stage reads from data source and writes to an output buffer, and there are two types of sources:
- Table scan source (sometimes referred to as partitioned source in code)
- Remote source (sometimes referred to as unpartitioned source in code)
The following figure shows part of a query plan with two stages:
See more details about stage at https://prestodb.github.io/docs/current/overview/concepts.html#stage
StageScheduler is responsible for the following jobs:
- Create tasks for this stage
- Schedule table splits to tasks
This section will discuss about stage scheduler before grouped execution is introduced. ScaledWriterScheduler is not discussed here.
The StageScheduler interface is quite concise (https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/StageScheduler.java):
public interface StageScheduler
extends Closeable
{
/**
* Schedules as much work as possible without blocking.
* The schedule results is a hint to the query scheduler if and
* when the stage scheduler should be invoked again. It is
* important to note that this is only a hint and the query
* scheduler may call the schedule method at any time.
*/
ScheduleResult schedule();
@Override
default void close() {}
}
ScheduleResult
contains the following information (see https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ScheduleResult.java#L36-L40):
Set<RemoteTask> newTasks;
ListenableFuture<?> blocked;
Optional<BlockedReason> blockedReason;
boolean finished;
int splitsScheduled;
Prior to grouped execution (introduced in https://github.com/prestodb/presto/pull/8951), there are two block reasons:
-
WAITING_FOR_SOURCE
: Blocked on fetching splits from Connector (SplitSource#getNextBatch
) -
SPLIT_QUEUES_FULL
: Some splits are not assigned since the node has enough work to do. This is enforced in NodeSelector#computeAssignments (e.g. https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SimpleNodeSelector.java#L130-L134)
In this section, we are going to introduce the three main stage schedulers prior to grouped execution (https://github.com/prestodb/presto/pull/8951).
This is used when stage has node partitioning, and all the split sources are remote. A typical case is performing join or aggregate over unbucketed table. This is the simplest stage scheduler, all tasks are created in one time and there will be no further split scheduling (since all splits are RemoteSplit
): https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedCountScheduler.java#L55-L65
This is used when stage has node partitioning, and at least one split source is from table scan. This usually happens to stages reading from bucketed tables.
For each table scan split source, an SourcePartitionedScheduler
is created to help schedule the splits. We denote these SourcePartitionedScheduler
as s_1, s_2, …, s_k
and the FixedSourcePartitionedScheduler
as f
. Note these s_i
share the same SqlStageExecution
with f
-- so each s_i
is not a “independent” stage scheduler, but somewhat used as a “source scheduler” for f
.
s_i.schedule()
are called in order (from builder to probe side). s_{i+1}.schedule()
will be called only after s_i
finished schedule (but not execution!), as demonstrated by the following simplified code: (the actual code can be found at https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L90-L105) :
while (!sourceSchedulers.isEmpty()) {
ScheduleResult schedule = sourceSchedulers.peek().schedule();
if (schedule.isDone()) {
sourceSchedulers.remove();
}
else {
break;
}
}
All the tasks are eagerly created when first time schedule() is called. https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L78-L85
An interesting note is both the following cases will use FixedSourcePartitionedScheduler
:
- Colocated join (i.e. join two bucketed tables).
- Bucketed table join unbucketed table. In this case, remote exchange will be added to the unbucketed table side to adopt the same bucketing.
To Be Continued....