TiDB 物理优化
physicalOptimize(WIP)
func physicalOptimize(logic LogicalPlan, planCounter *PlanCounterTp) (PhysicalPlan, float64, error) {
// 递归的计算并保存每一个逻辑算子的统计信息(basePlan.stats)
if _, err := logic.recursiveDeriveStats(nil); err != nil {
return nil, 0, err
}
// 引入预处理 property 函数的原因是为了减少一些没有必要考虑的 properties,
// 从而尽可能早的裁减掉成物理计划搜索路径上的分支
preparePossibleProperties(logic)
// 在展开前,先引入 property.PhysicalProperty,这个概念很重要。
// property.PhysicalProperty 是对算子返回值数据的要求,
// 比如希望有些算子是按某些列有序的方式返回数据,那么会设置对应的列信息,
// 希望有些算子的类型是什么(root,cop),那么会设置对应的类型信息,
// 有些算子是没有要求的那么可以传空的 property.PhysicalProperty。
prop := &property.PhysicalProperty{
TaskTp: property.RootTaskType,
ExpectedCnt: math.MaxFloat64,
}
logic.SCtx().GetSessionVars().StmtCtx.TaskMapBakTS = 0
// findBestTask 将逻辑计划转换为物理计划。
// 从父节点到孩子节点递归调用以创建最终完整的物理计划。
// 一些逻辑计划会将孩子转换为不同的物理计划,并返回一个 cost 最低的物理计划
// 以及在这个函数中发现了多少计划
// planCounter 是 planner 用来强制选择一个执行计划的计数器:选择发现的第 planCounter 个 plan
t, _, err := logic.findBestTask(prop, planCounter)
if err != nil {
return nil, 0, err
}
if *planCounter > 0 {
logic.SCtx().GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("The parameter of nth_plan() is out of range."))
}
if t.invalid() {
return nil, 0, ErrInternal.GenWithStackByArgs("Can't find a proper physical plan for this query")
}
err = t.plan().ResolveIndices()
return t.plan(), t.cost(), err
}
recursiveDeriveStats
type LogicalPlan interface {
// recursiveDeriveStats derives statistic info between plans.
recursiveDeriveStats(colGroups [][]*expression.Column) (*property.StatsInfo, error)
}
func (p *baseLogicalPlan) recursiveDeriveStats(colGroups [][]*expression.Column) (*property.StatsInfo, error) {
childStats := make([]*property.StatsInfo, len(p.children))
childSchema := make([]*expression.Schema, len(p.children))
// ???
cumColGroups := p.self.ExtractColGroups(colGroups)
for i, child := range p.children {
childProfile, err := child.recursiveDeriveStats(cumColGroups)
if err != nil {
return nil, err
}
childStats[i] = childProfile
childSchema[i] = child.Schema()
}
return p.self.DeriveStats(childStats, p.self.Schema(), childSchema, colGroups)
}
DeriveStats
type LogicalPlan interface {
// DeriveStats derives statistic info for current plan node given child stats.
// We need selfSchema, childSchema here because it makes this method can be used in
// cascades planner, where LogicalPlan might not record its children or schema.
DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema, colGroups [][]*expression.Column) (*property.StatsInfo, error)
}
preparePossibleProperties
引入预处理 property 函数的原因是为了减少一些没有必要考虑的 properties,从而尽可能早的裁减掉成物理计划搜索路径上的分支
// preparePossibleProperties traverses the plan tree by a post-order method,
// recursively calls LogicalPlan PreparePossibleProperties interface.
func preparePossibleProperties(lp LogicalPlan) [][]*expression.Column {
childrenProperties := make([][][]*expression.Column, 0, len(lp.Children()))
// 先收集自己的每个孩子可以产生的 props
for _, child := range lp.Children() {
childrenProperties = append(childrenProperties, preparePossibleProperties(child))
}
// lp 自己可以利用的就只有 childrenProperties 这些了,
// lp.PreparePossibleProperties 会保留可以利用 childrenProperties 的物理计划
// 并返回 lp 可以产生的 props
return lp.PreparePossibleProperties(lp.Schema(), childrenProperties...)
}
PreparePossibleProperties
PreparePossibleProperties
仅用于连接和聚合。拿 group by a,b,c 为例子,LogicalAggregration
可以要求 (a,b,c) 有序,或 (b,a,c) 有序,或 (b,c,a) 有序等等,但 DataSource
中的有序索引是有限的,比如只有一个 (a,b,c) 的索引,那么只能 (a,b,c) 有序。那么可以通过 PreparePossibleProperties
来去掉不可能其余不可能的 prop
可以根据 childrenProperties
来进行预减枝的 LogicalPlan
,需要实现 PreparePossibleProperties
,未实现的会继承 *baseLogicalPlan.PreparePossibleProperties
type LogicalPlan interface {
// PreparePossibleProperties is only used for join and aggregation. Like group by a,b,c, all permutation of (a,b,c) is
// valid, but the ordered indices in leaf plan is limited. So we can get all possible order properties by a pre-walking.
PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column
}
// PreparePossibleProperties implements LogicalPlan PreparePossibleProperties interface.
func (p *baseLogicalPlan) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
return nil
}
*LogicalSelection.PreparePossibleProperties
func (p *LogicalSelection) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
// 直接他孩子能保证谁有序,那他就能保证谁有序,selection 只有一个孩子
return childrenProperties[0]
}
*DataSource.PreparePossibleProperties
// PreparePossibleProperties implements LogicalPlan PreparePossibleProperties interface.
func (ds *DataSource) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
result := make([][]*expression.Column, 0, len(ds.possibleAccessPaths))
// 遍历所有可以的读表方式
for _, path := range ds.possibleAccessPaths {
// 如果这个 path 是走单 int 主键索引的,那么可以保证主键有序
if path.IsIntHandlePath {
col := ds.getPKIsHandleCol()
if col != nil {
result = append(result, []*expression.Column{col})
}
continue
}
if len(path.IdxCols) == 0 {
continue
}
// 如果这个 path 的 len(path.IdxCols) != 0, 说明走的是
// 单列非 int 主键索引,或多列主键索引,或二级索引
// 例如有 path.IdxCols = [a, b, c] 三个列,那么首先可以保证(a, b, c)有序
result = append(result, make([]*expression.Column, len(path.IdxCols)))
copy(result[len(result)-1], path.IdxCols)
// ??? See https://github.com/pingcap/tidb/pull/10548
for i := 0; i < path.EqCondCount && i+1 < len(path.IdxCols); i++ {
result = append(result, make([]*expression.Column, len(path.IdxCols)-i-1))
copy(result[len(result)-1], path.IdxCols[i+1:])
}
}
return result
}
*LogicalAggregation.PreparePossibleProperties
// PreparePossibleProperties implements LogicalPlan PreparePossibleProperties interface.
func (la *LogicalAggregation) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
// LogicalAggregation 应该就只有一个 child
childProps := childrenProperties[0]
// If there's no group-by item, the stream aggregation could have no order property. So we can add an empty property
// when its group-by item is empty.
if len(la.GroupByItems) == 0 {
la.possibleProperties = [][]*expression.Column{nil}
return nil
}
resultProperties := make([][]*expression.Column, 0, len(childProps))
clonedProperties := make([][]*expression.Column, 0, len(childProps))
// GetGroupByCols returns the columns that are group-by items.
// For example, `group by a, b, c+d` will return [a, b].
groupByCols := la.GetGroupByCols()
for _, possibleChildProperty := range childProps {
sortColOffsets := getMaxSortPrefix(possibleChildProperty, groupByCols)
// 如果 possibleChildProperty 的前 len(groupByCols) 列恰好是包括了
// 所有的 groupByCols 列,那么就可以利用 possibleChildProperty
// 的前 len(groupByCols) 列有序这一 prop 来做 stream agg
if len(sortColOffsets) == len(groupByCols) {
prop := possibleChildProperty[:len(groupByCols)]
resultProperties = append(resultProperties, prop)
// 要 clone 一下,see https://github.com/pingcap/tidb/pull/24204
// Clone the Columns in the property before saving them, otherwise the upper Projection may
// modify them and lead to unexpected results.
clonedProp := make([]*expression.Column, len(prop))
for i, col := range prop {
clonedProp[i] = col.Clone().(*expression.Column)
}
clonedProperties = append(clonedProperties, clonedProp)
}
}
// 可以利用的 props 保存在 la.possibleProperties
la.possibleProperties = clonedProperties
return resultProperties
}
*LogicalJoin.PreparePossibleProperties
// PreparePossibleProperties implements LogicalPlan PreparePossibleProperties interface.
func (p *LogicalJoin) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
leftProperties := childrenProperties[0]
rightProperties := childrenProperties[1]
// TODO: We should consider properties propagation.
// 要 clone 一下,see https://github.com/pingcap/tidb/pull/24204
// Clone the Columns in the property before saving them, otherwise the upper Projection may
// modify them and lead to unexpected results.
// p.leftProperties 就是左表可以产生的有序的 prop,保存在这里,在 GetMergeJoin 中使用
p.leftProperties = clonePossibleProperties(leftProperties)
// p.rightProperties 就是右表可以产生的有序的 prop,保存在这里,在 GetMergeJoin 中使用
p.rightProperties = clonePossibleProperties(rightProperties)
// 如果是左连接,那么就可以忽略掉右表的顺序,因为连接之后右表可能有 null
if p.JoinType == LeftOuterJoin || p.JoinType == LeftOuterSemiJoin {
rightProperties = nil
// 如果是右连接,那么就可以忽略掉左表的顺序,因为连接之后左表可能有 null
} else if p.JoinType == RightOuterJoin {
leftProperties = nil
}
// LogicalJoin 可以产生的顺序 props 即是左表可以产生的顺序 + 右表可以产生的顺序
// 即 join 后的结果可以按照左表可以产生的顺序中的某一个顺序有序,
// 或按照右表可以产生的顺序中的某一个顺序有序
resultProperties := make([][]*expression.Column, len(leftProperties)+len(rightProperties))
for i, cols := range leftProperties {
resultProperties[i] = make([]*expression.Column, len(cols))
copy(resultProperties[i], cols)
}
leftLen := len(leftProperties)
for i, cols := range rightProperties {
resultProperties[leftLen+i] = make([]*expression.Column, len(cols))
copy(resultProperties[leftLen+i], cols)
}
return resultProperties
}
*LogicalProjection.PreparePossibleProperties
func (p *LogicalProjection) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
childProperties := childrenProperties[0]
// 取单纯映射(不涉及运算)前的 col 作为 oldCols,单纯映射(不涉及运算)后的 col 作为 newCols
oldCols := make([]*expression.Column, 0, p.schema.Len())
newCols := make([]*expression.Column, 0, p.schema.Len())
for i, expr := range p.Exprs {
if col, ok := expr.(*expression.Column); ok {
// 如果 col 是一个 expr.(*expression.Column),那么说明 col 就是一个纯映射,而不是运算
newCols = append(newCols, p.schema.Columns[i])
oldCols = append(oldCols, col)
}
}
tmpSchema := expression.NewSchema(oldCols...)
newProperties := make([][]*expression.Column, 0, len(childProperties))
for _, childProperty := range childProperties {
newChildProperty := make([]*expression.Column, 0, len(childProperty))
for _, col := range childProperty {
pos := tmpSchema.ColumnIndex(col)
// col (a, b, c) 在 LogicalProjection 是一个纯映射,那么可以保证映射后的 col 也有序
if pos >= 0 {
newChildProperty = append(newChildProperty, newCols[pos])
} else {
// col (d) 在 LogicalProjection 不是一个纯映射,那么不能保证映射后的 col 也有序
// 而且只能保证前面 cols (a, b, c) 有序了
break
}
}
if len(newChildProperty) != 0 {
newProperties = append(newProperties, newChildProperty)
}
}
// ??? 为啥不像 LogicalJoin 或 LogicalAggregation 一样把 newProperties 在本节点存一下呢?
return newProperties
}
findBestTask
type LogicalPlan interface {
// findBestTask converts the logical plan to the physical plan. It's a new interface.
// It is called recursively from the parent to the children to create the result physical plan.
// Some logical plans will convert the children to the physical plans in different ways, and return the one
// With the lowest cost and how many plans are found in this function.
// planCounter is a counter for planner to force a plan.
// If planCounter > 0, the clock_th plan generated in this function will be returned.
// If planCounter = 0, the plan generated in this function will not be considered.
// If planCounter = -1, then we will not force plan.
findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp) (task, int64, error)
}
property.PhysicalProperty
// PhysicalProperty stands for the required physical property by parents.
// It contains the orders and the task types.
type PhysicalProperty struct {
// SortItems contains the required sort attributes.
SortItems []SortItem
// TaskTp means the type of task that an operator requires.
//
// It needs to be specified because two different tasks can't be compared
// with cost directly. e.g. If a copTask takes less cost than a rootTask,
// we can't sure that we must choose the former one. Because the copTask
// must be finished and increase its cost in sometime, but we can't make
// sure the finishing time. So the best way to let the comparison fair is
// to add TaskType to required property.
TaskTp TaskType
// ExpectedCnt means this operator may be closed after fetching ExpectedCnt
// records.
ExpectedCnt float64
// hashcode stores the hash code of a PhysicalProperty, will be lazily
// calculated when function "HashCode()" being called.
hashcode []byte
...
}
// SortItem wraps the column and its order.
type SortItem struct {
Col *expression.Column
Desc bool
}
const (
// RootTaskType stands for the tasks that executed in the TiDB layer.
RootTaskType TaskType = iota
// CopSingleReadTaskType stands for the a TableScan or IndexScan tasks
// executed in the coprocessor layer.
CopSingleReadTaskType
// CopDoubleReadTaskType stands for the a IndexLookup tasks executed in the
// coprocessor layer.
CopDoubleReadTaskType
...
)
*baseLogicalPlan.findBestTask(WIP)
// findBestTask implements LogicalPlan interface.
func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp) (bestTask task, cntPlan int64, err error) {
// If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
// and set inner child prop nil, so here we do nothing.
if prop == nil {
return nil, 1, nil
}
// Look up the task with this prop in the task map.
// It's used to reduce double counting.
// 使用记忆化搜索,每当一个算子收到一个 prop ,来 findBestTask
// 会计算 prop 的 hash 值和根据 prop 找到的 bestTask 并存储到
// baseLogicalPlan.taskMap map[string]task
// 如果这个算子再收到同样的 prop,则可以直接返回上次找到的 bestTask
bestTask = p.getTask(prop)
if bestTask != nil {
planCounter.Dec(1)
return bestTask, 1, nil
}
// 能否在 p.self 上面加一个 LogicalSort 来强制满足 prop
canAddEnforcer := prop.CanAddEnforcer
// ???
if prop.TaskTp != property.RootTaskType && !prop.IsFlashProp() {
// Currently all plan cannot totally push down to TiKV.
p.storeTask(prop, invalidTask)
return invalidTask, 0, nil
}
bestTask = invalidTask
cntPlan = 0
// prop should be read only because its cached hashcode might be not consistent
// when it is changed. So we clone a new one for the temporary changes.
newProp := prop.CloneEssentialFields()
// plansFitsProp 代表不额外加 LogicalSort,且可以满足 prop 的 PhysicalPlans
// plansNeedEnforce 代表额外加 LogicalSort,且可以满足 prop 的 PhysicalPlans
var plansFitsProp, plansNeedEnforce []PhysicalPlan
var hintWorksWithProp bool
// Maybe the plan can satisfy the required property,
// so we try to get the task without the enforced sort first.
// 找到 p.self 这个逻辑算子,对应可以满足 newProp 的所有物理算子
plansFitsProp, hintWorksWithProp, err = p.self.exhaustPhysicalPlans(newProp)
if err != nil {
return nil, 0, err
}
// 如果 sql 里有 hint,
// 但是对于 plansFitsProp 中的所有 PhysicalPlans, hint 都不能 work(newProp 无法让 hint work)
// newProp 必须是非空的(存在要求按顺序的列)才有必要额外加 LogicalSort
if !hintWorksWithProp && !newProp.IsEmpty() {
// If there is a hint in the plan and the hint cannot satisfy the property,
// we enforce this property and try to generate the PhysicalPlan again to
// make sure the hint can work.
canAddEnforcer = true
}
if canAddEnforcer {
// Then, we use the empty property to get physicalPlans and
// try to get the task with an enforced sort.
// 搞一个空的 newProp 来 p.self.exhaustPhysicalPlans
// 因为要额外加一个 LogicalSort,所以原 prop 由 LogicalSort 来满足
newProp.SortItems = []property.SortItem{}
newProp.ExpectedCnt = math.MaxFloat64
newProp.MPPPartitionCols = nil
newProp.MPPPartitionTp = property.AnyType
var hintCanWork bool
// 找到 p.self 这个逻辑算子,对应可以符合 newProp 的所有物理算子
plansNeedEnforce, hintCanWork, err = p.self.exhaustPhysicalPlans(newProp)
if err != nil {
return nil, 0, err
}
if hintCanWork && !hintWorksWithProp {
// If the hint can work with the empty property, but cannot work with
// the required property, we give up `plansFitProp` to make sure the hint
// can work.
plansFitsProp = nil
}
if !hintCanWork && !hintWorksWithProp && !prop.CanAddEnforcer {
// If the original property is not enforced and hint cannot
// work anyway, we give up `plansNeedEnforce` for efficiency,
plansNeedEnforce = nil
}
newProp = prop
}
var cnt int64
var curTask task
if bestTask, cnt, err = p.enumeratePhysicalPlans4Task(plansFitsProp, newProp, false, planCounter); err != nil {
return nil, 0, err
}
cntPlan += cnt
if planCounter.Empty() {
goto END
}
curTask, cnt, err = p.enumeratePhysicalPlans4Task(plansNeedEnforce, newProp, true, planCounter)
if err != nil {
return nil, 0, err
}
cntPlan += cnt
if planCounter.Empty() {
bestTask = curTask
goto END
}
if curTask.cost() < bestTask.cost() || (bestTask.invalid() && !curTask.invalid()) {
bestTask = curTask
}
END:
// 使用记忆化搜索,每当一个算子收到一个 prop ,来 findBestTask
// 会计算 prop 的 hash 值和根据 prop 找到的 bestTask 并存储到
// baseLogicalPlan.taskMap map[string]task
// 如果这个算子再收到同样的 prop,则可以直接返回上次找到的 bestTask
p.storeTask(prop, bestTask)
return bestTask, cntPlan, nil
}
exhaustPhysicalPlans
找到 LogicalPlan 这个逻辑算子,对应可以符合 *property.PhysicalProperty 的所有物理算子
type LogicalPlan interface {
// exhaustPhysicalPlans generates all possible plans that can match the required property.
// It will return:
// 1. All possible plans that can match the required property.
// 2. Whether the SQL hint can work. Return true if there is no hint.
exhaustPhysicalPlans(*property.PhysicalProperty) (physicalPlans []PhysicalPlan, hintCanWork bool, err error)
}
*LogicalProjection.exhaustPhysicalPlans
func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
// TryToGetChildProp will check if this sort property can be pushed or not.
// When a sort column will be replaced by scalar function, we refuse it.
// When a sort column will be replaced by a constant, we just remove it.
// 将 prop 中的 cols 对应为映射前的 cols
// 如果 prop 中的存在某个 col 对应映射前的 col 是一个 *expression.ScalarFunction 而不是 ×expression.Coloumn
// 那么说明 LogicalProjection 无法接受这个 prop
// 因为无法保证映射前的 *expression.ScalarFunction 是有序的
newProp, ok := p.TryToGetChildProp(prop)
if !ok {
return nil, true, nil
}
newProps := []*property.PhysicalProperty{newProp}
// generate a mpp task candidate if enforced mpp
...
ret := make([]PhysicalPlan, 0, len(newProps))
for _, newProp := range newProps {
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
AvoidColumnEvaluator: p.AvoidColumnEvaluator,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, newProp)
proj.SetSchema(p.schema)
ret = append(ret, proj)
}
return ret, true, nil
}
// Init initializes PhysicalProjection.
func (p PhysicalProjection) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalProjection {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeProj, &p, offset)
// PhysicalProjection 的 child 所需要接受的 prop,就是 LogicalProjection 自己收到的 prop
// 经过上面 TryToGetChildProp 转换为 p.Exprs 中对应的的映射前的 newProp
p.childrenReqProps = props
p.stats = stats
return &p
}
*LogicalSelection.exhaustPhysicalPlans
func (p *LogicalSelection) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
childProp := prop.CloneEssentialFields()
sel := PhysicalSelection{
Conditions: p.Conditions,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, childProp)
return []PhysicalPlan{sel}, true, nil
}
func (p PhysicalSelection) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalSelection {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSel, &p, offset)
// PhysicalSelection 的 child 所需要接受的 prop,就是 LogicalSelection 自己收到的 prop
p.childrenReqProps = props
p.stats = stats
return &p
}
*LogicalAggregation.exhaustPhysicalPlans
func (la *LogicalAggregation) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
// sql hint 将 agg 下推到 coprocessor
// https://docs.pingcap.com/zh/tidb/dev/optimizer-hints#agg_to_cop
if la.aggHints.preferAggToCop {
// 无法将 agg 下推到 coprocessor,报个 warning
// 并将 la.aggHints.preferAggToCop 设为 false
if !la.canPushToCop(kv.TiKV) {
errMsg := "Optimizer Hint AGG_TO_COP is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
la.aggHints.preferAggToCop = false
}
}
// 检查 la.aggHints.preferAggType 来识别 sql hint 所要求的 agg type
// preferHash 为 true 或 preferStream 为 true
// 如果 sql hint 既要求 hash join,又要求 stream agg,说明有 conflict,或者不存在 sql hint 时
// preferHash, preferStream 都为 false
preferHash, preferStream := la.ResetHintIfConflicted()
hashAggs := la.getHashAggs(prop)
if hashAggs != nil && preferHash {
return hashAggs, true, nil
}
streamAggs := la.getStreamAggs(prop)
if streamAggs != nil && preferStream {
return streamAggs, true, nil
}
// sql hint 没有 prefer 的 agg type,或者 prefer 的 agg type 没法 work
aggs := append(hashAggs, streamAggs...)
if streamAggs == nil && preferStream && !prop.IsEmpty() {
errMsg := "Optimizer Hint STREAM_AGG is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
return aggs, !(preferStream || preferHash), nil
}
// canPushToCop check if we might push this plan to a specific store.
func (la *LogicalAggregation) canPushToCop(storeTp kv.StoreType) bool {
// canPushToCop checks if it can be pushed to some stores. For TiKV, it only checks datasource.
// For TiFlash, it will check whether the operator is supported, but note that the check might be inaccrute.
// LogicalAggregation.noCopPushDown indicates if planner must not push this agg down to coprocessor.
// It is true when the agg is in the outer child tree of apply.
return la.baseLogicalPlan.canPushToCop(storeTp) && !la.noCopPushDown
}
getHashAggs
func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []PhysicalPlan {
// hash agg 不能满足某列有序
if !prop.IsEmpty() {
return nil
}
if prop.TaskTp == property.MppTaskType && !la.checkCanPushDownToMPP() {
return nil
}
// GetAllPossibleChildTaskTypes enumrates the possible types of tasks for children.
// CopSingleReadTaskType, CopDoubleReadTaskType, RootTaskType
hashAggs := make([]PhysicalPlan, 0, len(prop.GetAllPossibleChildTaskTypes()))
// taskTypes 可以是 property.CopSingleReadTaskType 或 property.CopDoubleReadTaskType
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType}
// ??? MPP, broadcast join 相关
if la.ctx.GetSessionVars().AllowBCJ {
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := canPushDownToTiFlash && la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP()
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
if la.ctx.GetSessionVars().StmtCtx.InExplainStmt {
la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in non-mpp mode because it contains agg function with distinct"))
}
// 如果 LogicalAggregation has functions with distinct
// 且 !la.ctx.GetSessionVars().AllowDistinctAggPushDown
// 那么 taskTypes 只能是 property.RootTaskType
taskTypes = []property.TaskType{property.RootTaskType}
}
} else if !la.aggHints.preferAggToCop {
// 如果 sql hints not prefer agg to cop
// 那么 taskTypes 可以是
// property.CopSingleReadTaskType 或 property.CopDoubleReadTaskType 或 property.RootTaskType
taskTypes = append(taskTypes, property.RootTaskType)
}
if !la.canPushToCop(kv.TiKV) {
// 如果 agg 无法推到 cop
// 那么 taskTypes 只能是 property.RootTaskType
taskTypes = []property.TaskType{property.RootTaskType}
if canPushDownToTiFlash {
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
}
if canPushDownToMPP {
taskTypes = append(taskTypes, property.MppTaskType)
}
if prop.IsFlashProp() {
taskTypes = []property.TaskType{prop.TaskTp}
}
// 遍历所有可能的 taskTypes,为每一种 taskType 生成一个 PhysicalHashAgg
for _, taskTp := range taskTypes {
if taskTp == property.MppTaskType {
mppAggs := la.tryToGetMppHashAggs(prop)
if len(mppAggs) > 0 {
hashAggs = append(hashAggs, mppAggs...)
}
} else {
agg := NewPhysicalHashAgg(la,
la.stats.ScaleByExpectCnt(prop.ExpectedCnt),
// PhysicalHashAgg 的 PhysicalProperty 中的 TaskTp 为 taskTp
// 中的 SortItems 为 nil
&property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp})
agg.SetSchema(la.schema.Clone())
hashAggs = append(hashAggs, agg)
}
}
return hashAggs
}
// NewPhysicalHashAgg creates a new PhysicalHashAgg from a LogicalAggregation.
func NewPhysicalHashAgg(la *LogicalAggregation, newStats *property.StatsInfo, prop *property.PhysicalProperty) *PhysicalHashAgg {
agg := basePhysicalAgg{
GroupByItems: la.GroupByItems,
AggFuncs: la.AggFuncs,
}.initForHash(la.ctx, newStats, la.blockOffset, prop)
return agg
}
func (base basePhysicalAgg) initForHash(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalHashAgg {
p := &PhysicalHashAgg{base}
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeHashAgg, p, offset)
p.childrenReqProps = props
p.stats = stats
return p
}
getStreamAggs
func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
// TODO: support CopTiFlash task type in stream agg
if prop.IsFlashProp() {
return nil
}
// AllSameOrder checks if all the prop.SortItems have same order (desc or asc).
all, desc := prop.AllSameOrder()
// 如果不全是升序或全是降序,stream agg 是无法满足 prop 的
if !all {
return nil
}
// ???
for _, aggFunc := range la.AggFuncs {
if aggFunc.Mode == aggregation.FinalMode {
return nil
}
}
// GetGroupByCols returns the columns that are group-by items.
// For example, `group by a, b, c+d` will return [a, b].
groupByCols := la.GetGroupByCols()
if len(groupByCols) != len(la.GroupByItems) {
// 如果存在不是纯列 group by 的情况,那么不能使用 stream agg
return nil
}
// GetAllPossibleChildTaskTypes enumrates the possible types of tasks for children.
// CopSingleReadTaskType, CopDoubleReadTaskType, RootTaskType
allTaskTypes := prop.GetAllPossibleChildTaskTypes()
streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(allTaskTypes)-1)+len(allTaskTypes))
// ???
childProp := &property.PhysicalProperty{
ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt),
}
// la.possibleProperties 是在之前在 preparePossibleProperties 中保存的
// LogicalAggregation 可以保证有序的且包含了 groupByCols 中所有列的,列的有顺序的组合
// 遍历每一种 possibleChildProperty
for _, possibleChildProperty := range la.possibleProperties {
// 取 possibleChildProperty[:len(groupByCols)] 作为 childProp.SortItems
// physical stream agg 就会按照 childProp.SortItems 的列的顺序做 stream agg
// 也就是可以保证 childProp.SortItems 的列有序
childProp.SortItems = property.SortItemsFromCols(possibleChildProperty[:len(groupByCols)], desc)
// IsPrefix checks whether the order property is the prefix of another.
if !prop.IsPrefix(childProp) {
// prop 是 LogicalAggregation 的父算子要求 LogicalAggregation 应该满足的 prop
// 如果 prop.SortItems 不是 childProp.SortItems 的前缀,那么说明无法满足 prop
continue
}
// ???
// The table read of "CopDoubleReadTaskType" can't promises the sort
// property that the stream aggregation required, no need to consider.
taskTypes := []property.TaskType{property.CopSingleReadTaskType}
if la.HasDistinct() {
// TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented.
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
// 如果 LogicalAggregation has functions with distinct
// 且 !la.ctx.GetSessionVars().AllowDistinctAggPushDown
// 那么 taskTypes 只能是 property.RootTaskType
taskTypes = []property.TaskType{property.RootTaskType}
// ???
} else if !la.distinctArgsMeetsProperty() {
continue
}
} else if !la.aggHints.preferAggToCop {
// 如果 sql hints not prefer agg to cop
// 那么 taskTypes 可以是
// property.CopSingleReadTaskType 或 property.RootTaskType
taskTypes = append(taskTypes, property.RootTaskType)
}
if !la.canPushToCop(kv.TiKV) {
// 如果 agg 无法推到 cop
// 那么 taskTypes 只能是 property.RootTaskType
taskTypes = []property.TaskType{property.RootTaskType}
}
for _, taskTp := range taskTypes {
copiedChildProperty := new(property.PhysicalProperty)
*copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field.
copiedChildProperty.TaskTp = taskTp
agg := basePhysicalAgg{
GroupByItems: la.GroupByItems,
AggFuncs: la.AggFuncs,
}.initForStream(la.ctx, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), la.blockOffset, copiedChildProperty)
agg.SetSchema(la.schema.Clone())
streamAggs = append(streamAggs, agg)
}
}
// If STREAM_AGG hint is existed, it should consider enforce stream aggregation,
// because we can't trust possibleChildProperty completely.
if (la.aggHints.preferAggType & preferStreamAgg) > 0 {
// ???
streamAggs = append(streamAggs, la.getEnforcedStreamAggs(prop)...)
}
return streamAggs
}
func (base basePhysicalAgg) initForStream(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalStreamAgg {
p := &PhysicalStreamAgg{base}
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeStreamAgg, p, offset)
p.childrenReqProps = props
p.stats = stats
return p
}
*LogicalJoin.exhaustPhysicalPlans
// LogicalJoin can generates hash join, index join and sort merge join.
// Firstly we check the hint, if hint is figured by user, we force to choose the corresponding physical plan.
// If the hint is not matched, it will get other candidates.
// If the hint is not figured, we will pick all candidates.
func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
if val.(bool) {
indexJoins, _ := p.tryToGetIndexJoin(prop)
failpoint.Return(indexJoins, true, nil)
}
})
// ??? MPP, broadcast join 相关
if prop.MPPPartitionTp == property.BroadcastType {
return nil, false, nil
}
joins := make([]PhysicalPlan, 0, 8)
canPushToTiFlash := p.canPushToCop(kv.TiFlash)
if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash {
...
} else if p.ctx.GetSessionVars().AllowBCJ && canPushToTiFlash {
...
}
if prop.IsFlashProp() {
return joins, true, nil
}
// merge join
mergeJoins := p.GetMergeJoin(prop, p.schema, p.Stats(), p.children[0].statsInfo(), p.children[1].statsInfo())
if (p.preferJoinType&preferMergeJoin) > 0 && len(mergeJoins) > 0 {
return mergeJoins, true, nil
}
joins = append(joins, mergeJoins...)
// index join
indexJoins, forced := p.tryToGetIndexJoin(prop)
if forced {
return indexJoins, true, nil
}
joins = append(joins, indexJoins...)
// hash join
hashJoins := p.getHashJoins(prop)
if (p.preferJoinType&preferHashJoin) > 0 && len(hashJoins) > 0 {
return hashJoins, true, nil
}
joins = append(joins, hashJoins...)
if p.preferJoinType > 0 {
// If we reach here, it means we have a hint that doesn't work.
// It might be affected by the required property, so we enforce
// this property and try the hint again.
return joins, false, nil
}
return joins, true, nil
}
GetMergeJoin
// GetMergeJoin convert the logical join to physical merge join based on the physical property.
func (p *LogicalJoin) GetMergeJoin(prop *property.PhysicalProperty, schema *expression.Schema, statsInfo *property.StatsInfo, leftStatsInfo *property.StatsInfo, rightStatsInfo *property.StatsInfo) []PhysicalPlan {
joins := make([]PhysicalPlan, 0, len(p.leftProperties)+1)
// GetJoinKeys extracts join keys(columns) from EqualConditions. It returns left join keys, right
// join keys and an `isNullEQ` array which means the `joinKey[i]` is a `NullEQ` function. The `hasNullEQ`
// means whether there is a `NullEQ` of a join key.
// ??? NullEQ
leftJoinKeys, rightJoinKeys, isNullEQ, hasNullEQ := p.GetJoinKeys()
// EnumType/SetType Unsupported: merge join conflicts with index order.
// ref: https://github.com/pingcap/tidb/issues/24473, https://github.com/pingcap/tidb/issues/25669
...
// TODO: support null equal join keys for merge join
if hasNullEQ {
return nil
}
// The leftProperties caches all the possible properties that are provided by its children.
for _, lhsChildProperty := range p.leftProperties {
// 假设 lhsChildProperty 中第 n + 1 个 col 在 leftJoinKeys没有出现
// 返回 lhsChildProperty 中前 n 个 cols 在 leftJoinKeys 中的 offset
offsets := getMaxSortPrefix(lhsChildProperty, leftJoinKeys)
// If not all equal conditions hit properties. We ban merge join heuristically. Because in this case, merge join
// may get a very low performance. In executor, executes join results before other conditions filter it.
if len(offsets) < len(leftJoinKeys) {
// 前提:lhsChildProperty 中没有重复元素
// 如果 lhsChildProperty 的前 len(leftJoinKeys) 个元素没有都在 leftJoinKeys 中出现
// 说明该 lhsChildProperty 不能保证 leftJoinKeys 有序
continue
}
// 走到这里说明 lhsChildProperty 可以保证
// leftJoinKeys 中的 cols 按照 lhsChildProperty[:len(offsets)] 排列,有序
leftKeys := lhsChildProperty[:len(offsets)]
// 将 rightJoinKeys 按照 leftKeys 排列,使得 leftKeys 和 rightKeys 中的元素一一对应
rightKeys := expression.NewSchema(rightJoinKeys...).ColumnsByIndices(offsets)
newIsNullEQ := make([]bool, 0, len(offsets))
for _, offset := range offsets {
newIsNullEQ = append(newIsNullEQ, isNullEQ[offset])
}
// 遍历 p.rightProperties 中的每个 prop,每个 prop 对 rightKeys 取公共前缀
// 返回最长的公共前缀长度
// ??? prefixLen 小于 len(rightJoinKeys) 咋办,这样不就不能保证 rightJoinKeys 有序了?
prefixLen := findMaxPrefixLen(p.rightProperties, rightKeys)
if prefixLen == 0 {
continue
}
leftKeys = leftKeys[:prefixLen]
rightKeys = rightKeys[:prefixLen]
newIsNullEQ = newIsNullEQ[:prefixLen]
// ???
if !p.checkJoinKeyCollation(leftKeys, rightKeys) {
continue
}
offsets = offsets[:prefixLen]
baseJoin := basePhysicalJoin{
JoinType: p.JoinType,
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
DefaultValues: p.DefaultValues,
LeftJoinKeys: leftKeys,
RightJoinKeys: rightKeys,
IsNullEQ: newIsNullEQ,
}
mergeJoin := PhysicalMergeJoin{basePhysicalJoin: baseJoin}.Init(p.ctx, statsInfo.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset)
mergeJoin.SetSchema(schema)
// 因为 prefixLen 可能小于 len(rightJoinKeys) 嘛,
// 所以把 p.EqualConditions 中没有在 leftKeys,rightKeys 中出现的 exprs
// 和 p.OtherConditions 一起存放在 mergeJoin.OtherConditions
mergeJoin.OtherConditions = p.moveEqualToOtherConditions(offsets)
// ???
mergeJoin.initCompareFuncs()
// 只有当 prop 是 mergeJoin.LeftJoinKeys 或 mergeJoin.RightJoinKeys 的前缀的时候
// 父算子所要求的 prop 才被可以满足,
// 因为 mergeJoin 会满足 mergeJoin.LeftJoinKeys 或 mergeJoin.RightJoinKeys 有序
// Only if the input required prop is the prefix of join keys, we can pass through this property.
if reqProps, ok := mergeJoin.tryToGetChildReqProp(prop); ok {
// Adjust expected count for children nodes.
// ???
if prop.ExpectedCnt < statsInfo.RowCount {
expCntScale := prop.ExpectedCnt / statsInfo.RowCount
reqProps[0].ExpectedCnt = leftStatsInfo.RowCount * expCntScale
reqProps[1].ExpectedCnt = rightStatsInfo.RowCount * expCntScale
}
// 设置要求 mergejoin 的两个孩子满足的 props
// 即为 mergeJoin.LeftJoinKeys 和 mergeJoin.RightJoinKeys
mergeJoin.childrenReqProps = reqProps
_, desc := prop.AllSameOrder()
mergeJoin.Desc = desc
joins = append(joins, mergeJoin)
}
}
// ???
// If TiDB_SMJ hint is existed, it should consider enforce merge join,
// because we can't trust lhsChildProperty completely.
if (p.preferJoinType & preferMergeJoin) > 0 {
joins = append(joins, p.getEnforcedMergeJoin(prop, schema, statsInfo)...)
}
return joins
}
// Only if the input required prop is the prefix fo join keys, we can pass through this property.
func (p *PhysicalMergeJoin) tryToGetChildReqProp(prop *property.PhysicalProperty) ([]*property.PhysicalProperty, bool) {
all, desc := prop.AllSameOrder()
lProp := property.NewPhysicalProperty(property.RootTaskType, p.LeftJoinKeys, desc, math.MaxFloat64, false)
rProp := property.NewPhysicalProperty(property.RootTaskType, p.RightJoinKeys, desc, math.MaxFloat64, false)
if !prop.IsEmpty() {
// sort merge join fits the cases of massive ordered data, so desc scan is always expensive.
if !all {
return nil, false
}
if !prop.IsPrefix(lProp) && !prop.IsPrefix(rProp) {
// prop 即不是 lprop 的前缀,也不是 rprop 的前缀,无法满足 prop
return nil, false
}
if prop.IsPrefix(rProp) && p.JoinType == LeftOuterJoin {
// 左连接,左表是外表,merge join 会保证左表 join keys 有序,
// 若 prop 不是 lprop 的前缀,那么无法满足 prop
return nil, false
}
if prop.IsPrefix(lProp) && p.JoinType == RightOuterJoin {
// 右连接,右表是外表,merge join 会保证右表 join keys 有序,
// 若 prop 不是 rprop 的前缀,那么无法满足 prop
return nil, false
}
// ??? 半连接咋办
}
return []*property.PhysicalProperty{lProp, rProp}, true
}
getHashJoins
func (p *LogicalJoin) getHashJoins(prop *property.PhysicalProperty) []PhysicalPlan {
// hash join 不能满足某列有序
if !prop.IsEmpty() { // hash join doesn't promise any orders
return nil
}
joins := make([]PhysicalPlan, 0, 2)
switch p.JoinType {
case SemiJoin, AntiSemiJoin, LeftOuterSemiJoin, AntiLeftOuterSemiJoin:
// 半连接,右表是内表,左表是外表,
// 只允许 build 内表,probe 外表
joins = append(joins, p.getHashJoin(prop, 1, false))
case LeftOuterJoin:
// 左连接,右表是内表,左表是外表,
// 允许 build 内表,probe 外表
// 如果外表比较小,允许 build 外表,probe 内表
if ForceUseOuterBuild4Test {
joins = append(joins, p.getHashJoin(prop, 1, true))
} else {
joins = append(joins, p.getHashJoin(prop, 1, false))
joins = append(joins, p.getHashJoin(prop, 1, true))
}
case RightOuterJoin:
// 右连接,左表是内表,右表是外表,
// 允许 build 内表,probe 外表
// 如果外表比较小,允许 build 外表,probe 内表
if ForceUseOuterBuild4Test {
joins = append(joins, p.getHashJoin(prop, 0, true))
} else {
joins = append(joins, p.getHashJoin(prop, 0, false))
joins = append(joins, p.getHashJoin(prop, 0, true))
}
case InnerJoin:
// 内连接,先拿右表当内表搞一个 plan ,再拿左表当内表搞一个 plan
if ForcedHashLeftJoin4Test {
joins = append(joins, p.getHashJoin(prop, 1, false))
} else {
joins = append(joins, p.getHashJoin(prop, 1, false))
joins = append(joins, p.getHashJoin(prop, 0, false))
}
}
return joins
}
func (p *LogicalJoin) getHashJoin(prop *property.PhysicalProperty, innerIdx int, useOuterToBuild bool) *PhysicalHashJoin {
chReqProps := make([]*property.PhysicalProperty, 2)
chReqProps[innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}
chReqProps[1-innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}
// ???
if prop.ExpectedCnt < p.stats.RowCount {
expCntScale := prop.ExpectedCnt / p.stats.RowCount
chReqProps[1-innerIdx].ExpectedCnt = p.children[1-innerIdx].statsInfo().RowCount * expCntScale
}
hashJoin := NewPhysicalHashJoin(p, innerIdx, useOuterToBuild, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), chReqProps...)
hashJoin.SetSchema(p.schema)
return hashJoin
}
// NewPhysicalHashJoin creates a new PhysicalHashJoin from LogicalJoin.
func NewPhysicalHashJoin(p *LogicalJoin, innerIdx int, useOuterToBuild bool, newStats *property.StatsInfo, prop ...*property.PhysicalProperty) *PhysicalHashJoin {
leftJoinKeys, rightJoinKeys, isNullEQ, _ := p.GetJoinKeys()
baseJoin := basePhysicalJoin{
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
OtherConditions: p.OtherConditions,
LeftJoinKeys: leftJoinKeys,
RightJoinKeys: rightJoinKeys,
IsNullEQ: isNullEQ,
JoinType: p.JoinType,
DefaultValues: p.DefaultValues,
InnerChildIdx: innerIdx,
}
hashJoin := PhysicalHashJoin{
basePhysicalJoin: baseJoin,
EqualConditions: p.EqualConditions,
Concurrency: uint(p.ctx.GetSessionVars().HashJoinConcurrency()),
UseOuterToBuild: useOuterToBuild,
}.Init(p.ctx, newStats, p.blockOffset, prop...)
return hashJoin
}
tryToGetIndexJoin
// tryToGetIndexJoin will get index join by hints. If we can generate a valid index join by hint, the second return value
// will be true, which means we force to choose this index join. Otherwise we will select a join algorithm with min-cost.
func (p *LogicalJoin) tryToGetIndexJoin(prop *property.PhysicalProperty) (indexJoins []PhysicalPlan, canForced bool) {
// 提示优化器对指定表使用 Index Nested Loop Join 算法
// https://docs.pingcap.com/zh/tidb/dev/optimizer-hints#inl_joint1_name--tl_name-
inljRightOuter := (p.preferJoinType & preferLeftAsINLJInner) > 0
inljLeftOuter := (p.preferJoinType & preferRightAsINLJInner) > 0
hasINLJHint := inljLeftOuter || inljRightOuter
// 提示优化器使用 Index Nested Loop Hash Join 算法
// https://docs.pingcap.com/zh/tidb/dev/optimizer-hints#inl_hash_join
inlhjRightOuter := (p.preferJoinType & preferLeftAsINLHJInner) > 0
inlhjLeftOuter := (p.preferJoinType & preferRightAsINLHJInner) > 0
hasINLHJHint := inlhjLeftOuter || inlhjRightOuter
// 提示优化器使用 Index Nested Loop Merge Join 算法
inlmjRightOuter := (p.preferJoinType & preferLeftAsINLMJInner) > 0
inlmjLeftOuter := (p.preferJoinType & preferRightAsINLMJInner) > 0
hasINLMJHint := inlmjLeftOuter || inlmjRightOuter
forceLeftOuter := inljLeftOuter || inlhjLeftOuter || inlmjLeftOuter
forceRightOuter := inljRightOuter || inlhjRightOuter || inlmjRightOuter
needForced := forceLeftOuter || forceRightOuter
defer func() {
// refine error message
// If the required property is not empty, we will enforce it and try the hint again.
// So we only need to generate warning message when the property is empty.
if !canForced && needForced && prop.IsEmpty() {
// Construct warning message prefix.
var errMsg string
switch {
case hasINLJHint:
errMsg = "Optimizer Hint INL_JOIN or TIDB_INLJ is inapplicable"
case hasINLHJHint:
errMsg = "Optimizer Hint INL_HASH_JOIN is inapplicable"
case hasINLMJHint:
errMsg = "Optimizer Hint INL_MERGE_JOIN is inapplicable"
}
if p.hintInfo != nil {
t := p.hintInfo.indexNestedLoopJoinTables
switch {
case len(t.inljTables) != 0:
errMsg = fmt.Sprintf("Optimizer Hint %s or %s is inapplicable",
restore2JoinHint(HintINLJ, t.inljTables), restore2JoinHint(TiDBIndexNestedLoopJoin, t.inljTables))
case len(t.inlhjTables) != 0:
errMsg = fmt.Sprintf("Optimizer Hint %s is inapplicable", restore2JoinHint(HintINLHJ, t.inlhjTables))
case len(t.inlmjTables) != 0:
errMsg = fmt.Sprintf("Optimizer Hint %s is inapplicable", restore2JoinHint(HintINLMJ, t.inlmjTables))
}
}
// Append inapplicable reason.
if len(p.EqualConditions) == 0 {
errMsg += " without column equal ON condition"
}
// Generate warning message to client.
warning := ErrInternal.GenWithStack(errMsg)
p.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}()
// supportLeftOuter and supportRightOuter indicates whether this type of join
// supports the left side or right side to be the outer side.
var supportLeftOuter, supportRightOuter bool
switch p.JoinType {
case SemiJoin, AntiSemiJoin, LeftOuterSemiJoin, AntiLeftOuterSemiJoin, LeftOuterJoin:
// 如果是半连接或左连接,那么可以允许左表当外表
supportLeftOuter = true
case RightOuterJoin:
// 如果是右连接,那么可以允许右表当外表
supportRightOuter = true
case InnerJoin:
// 如果是内连接,那么可以允许右表或左表当外表
supportLeftOuter, supportRightOuter = true, true
}
var allLeftOuterJoins, allRightOuterJoins, forcedLeftOuterJoins, forcedRightOuterJoins []PhysicalPlan
if supportLeftOuter {
// 可以让左表当外表,获取所有以左表作为外表的
// PhysicalIndexJoin 或 PhysicalIndexHashJoin 或 PhysicalIndexMergeJoin
allLeftOuterJoins = p.getIndexJoinByOuterIdx(prop, 0)
forcedLeftOuterJoins = make([]PhysicalPlan, 0, len(allLeftOuterJoins))
for _, j := range allLeftOuterJoins {
switch j.(type) {
case *PhysicalIndexJoin:
if inljLeftOuter {
forcedLeftOuterJoins = append(forcedLeftOuterJoins, j)
}
case *PhysicalIndexHashJoin:
if inlhjLeftOuter {
forcedLeftOuterJoins = append(forcedLeftOuterJoins, j)
}
case *PhysicalIndexMergeJoin:
if inlmjLeftOuter {
forcedLeftOuterJoins = append(forcedLeftOuterJoins, j)
}
}
}
switch {
case len(forcedLeftOuterJoins) == 0 && !supportRightOuter:
return filterIndexJoinBySessionVars(p.ctx, allLeftOuterJoins), false
case len(forcedLeftOuterJoins) != 0 && (!supportRightOuter || (forceLeftOuter && !forceRightOuter)):
return forcedLeftOuterJoins, true
}
}
if supportRightOuter {
allRightOuterJoins = p.getIndexJoinByOuterIdx(prop, 1)
forcedRightOuterJoins = make([]PhysicalPlan, 0, len(allRightOuterJoins))
for _, j := range allRightOuterJoins {
switch j.(type) {
case *PhysicalIndexJoin:
if inljRightOuter {
forcedRightOuterJoins = append(forcedRightOuterJoins, j)
}
case *PhysicalIndexHashJoin:
if inlhjRightOuter {
forcedRightOuterJoins = append(forcedRightOuterJoins, j)
}
case *PhysicalIndexMergeJoin:
if inlmjRightOuter {
forcedRightOuterJoins = append(forcedRightOuterJoins, j)
}
}
}
switch {
case len(forcedRightOuterJoins) == 0 && !supportLeftOuter:
return filterIndexJoinBySessionVars(p.ctx, allRightOuterJoins), false
case len(forcedRightOuterJoins) != 0 && (!supportLeftOuter || (forceRightOuter && !forceLeftOuter)):
return forcedRightOuterJoins, true
}
}
canForceLeft := len(forcedLeftOuterJoins) != 0 && forceLeftOuter
canForceRight := len(forcedRightOuterJoins) != 0 && forceRightOuter
canForced = canForceLeft || canForceRight
if canForced {
return append(forcedLeftOuterJoins, forcedRightOuterJoins...), true
}
return filterIndexJoinBySessionVars(p.ctx, append(allLeftOuterJoins, allRightOuterJoins...)), false
}
// getIndexJoinByOuterIdx will generate index join by outerIndex. OuterIdx points out the outer child.
// First of all, we'll check whether the inner child is DataSource.
// Then, we will extract the join keys of p's equal conditions. Then check whether all of them are just the primary key
// or match some part of on index. If so we will choose the best one and construct a index join.
func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, outerIdx int) (joins []PhysicalPlan) {
outerChild, innerChild := p.children[outerIdx], p.children[1-outerIdx]
all, _ := prop.AllSameOrder()
// If the order by columns are not all from outer child, index join cannot promise the order.
// INJ 的输出顺序就是按 outerChild 有序输,
// 如果 prop 中的 cols 有的不包含在 outerChild.Schema(),或 prop 中的列不是全部有序,
// 那么 INJ 将无法保证 prop
if !prop.AllColsFromSchema(outerChild.Schema()) || !all {
return nil
}
var (
innerJoinKeys []*expression.Column
outerJoinKeys []*expression.Column
)
if outerIdx == 0 {
outerJoinKeys, innerJoinKeys, _, _ = p.GetJoinKeys()
} else {
innerJoinKeys, outerJoinKeys, _, _ = p.GetJoinKeys()
}
// innerChild 需要直接是一个 DataSource 或者 UnionScan
ds, isDataSource := innerChild.(*DataSource)
us, isUnionScan := innerChild.(*LogicalUnionScan)
if (!isDataSource && !isUnionScan) || (isDataSource && ds.preferStoreType&preferTiFlash != 0) {
return nil
}
if isUnionScan {
// 如果 innerChild 是 UnionScan,那么取它的 child 作为 ds
// The child of union scan may be union all for partition table.
ds, isDataSource = us.Children()[0].(*DataSource)
if !isDataSource {
return nil
}
// If one of the union scan children is a TiFlash table, then we can't choose index join.
for _, child := range us.Children() {
if ds, ok := child.(*DataSource); ok && ds.preferStoreType&preferTiFlash != 0 {
return nil
}
}
}
// ???
var avgInnerRowCnt float64
if outerChild.statsInfo().RowCount > 0 {
avgInnerRowCnt = p.equalCondOutCnt / outerChild.statsInfo().RowCount
}
joins = p.buildIndexJoinInner2TableScan(prop, ds, innerJoinKeys, outerJoinKeys, outerIdx, us, avgInnerRowCnt)
if joins != nil {
return
}
return p.buildIndexJoinInner2IndexScan(prop, ds, innerJoinKeys, outerJoinKeys, outerIdx, us, avgInnerRowCnt)
}
// buildIndexJoinInner2TableScan builds a TableScan as the inner child for an
// IndexJoin if possible.
// If the inner side of a index join is a TableScan, only one tuple will be
// fetched from the inner side for every tuple from the outer side. This will be
// promised to be no worse than building IndexScan as the inner child.
func (p *LogicalJoin) buildIndexJoinInner2TableScan(
prop *property.PhysicalProperty, ds *DataSource, innerJoinKeys, outerJoinKeys []*expression.Column,
outerIdx int, us *LogicalUnionScan, avgInnerRowCnt float64) (joins []PhysicalPlan) {
var tblPath *util.AccessPath
// 要把 innner child build 成 TableScan,所以要先找出 TablePath
// IsTablePath returns true if it's IntHandlePath or CommonHandlePath.
for _, path := range ds.possibleAccessPaths {
if path.IsTablePath() && path.StoreType == kv.TiKV {
tblPath = path
break
}
}
if tblPath == nil {
return nil
}
keyOff2IdxOff := make([]int, len(innerJoinKeys))
newOuterJoinKeys := make([]*expression.Column, 0)
var ranges []*ranger.Range
var innerTask, innerTask2 task
var helper *indexJoinBuildHelper
if ds.tableInfo.IsCommonHandle {
helper, keyOff2IdxOff = p.getIndexJoinBuildHelper(ds, innerJoinKeys, func(path *util.AccessPath) bool { return path.IsCommonHandlePath }, outerJoinKeys)
if helper == nil {
return nil
}
innerTask = p.constructInnerTableScanTask(ds, nil, outerJoinKeys, us, false, false, avgInnerRowCnt)
// The index merge join's inner plan is different from index join, so we
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if us == nil {
innerTask2 = p.constructInnerTableScanTask(ds, nil, outerJoinKeys, us, true, !prop.IsEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
}
ranges = helper.chosenRanges
} else {
// InnerTable 主键就是一个 int 列
pkMatched := false
pkCol := ds.getPKIsHandleCol()
if pkCol == nil {
return nil
}
for i, key := range innerJoinKeys {
// 遍历 innerJoinKeys,找到是主键的那一个 key
if !key.Equal(nil, pkCol) {
keyOff2IdxOff[i] = -1
continue
}
pkMatched = true
keyOff2IdxOff[i] = 0
// ???
// Add to newOuterJoinKeys only if conditions contain inner primary key. For issue #14822.
newOuterJoinKeys = append(newOuterJoinKeys, outerJoinKeys[i])
}
// ???
outerJoinKeys = newOuterJoinKeys
if !pkMatched {
return nil
}
// innerTask 是一个不需要 KeepOrder,的 innerTable 的 TableScan
innerTask = p.constructInnerTableScanTask(ds, pkCol, outerJoinKeys, us, false, false, avgInnerRowCnt)
// The index merge join's inner plan is different from index join, so we
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if us == nil {
// 为了满足 index merge join,
// innerTask2 是一个需要 KeepOrder,的 innerTable 的 TableScan
// ??? 但是如果 innerTable 主键就是一个 int 列,而 innerJoinKeys 有多列,
// 就只能保证主键有序,保证不了 innerJoinKeys 有序啊
innerTask2 = p.constructInnerTableScanTask(ds, pkCol, outerJoinKeys, us, true, !prop.IsEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
}
}
var (
path *util.AccessPath
lastColMng *ColWithCmpFuncManager
)
if helper != nil {
path = helper.chosenPath
lastColMng = helper.lastColManager
}
joins = make([]PhysicalPlan, 0, 3)
failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(p.constructIndexHashJoin(prop, outerIdx, innerTask, nil, keyOff2IdxOff, path, lastColMng))
}
})
joins = append(joins, p.constructIndexJoin(prop, outerIdx, innerTask, ranges, keyOff2IdxOff, path, lastColMng, true)...)
// We can reuse the `innerTask` here since index nested loop hash join
// do not need the inner child to promise the order.
joins = append(joins, p.constructIndexHashJoin(prop, outerIdx, innerTask, ranges, keyOff2IdxOff, path, lastColMng)...)
if innerTask2 != nil {
joins = append(joins, p.constructIndexMergeJoin(prop, outerIdx, innerTask2, ranges, keyOff2IdxOff, path, lastColMng)...)
}
return joins
}
enumeratePhysicalPlans4Task
遍历 physicalPlans,为每一个 pp(当作头节点) 制作一个完整的执行计划(遍历 p.children,为每一个 child 调用 child.findBestTask(pp.GetChildReqProps(j), …),找到 child 的 best task 作为 pp 的 child),最后选出一个最优的,完整的执行计划。并返回 physicalPlans 中每个 pp 所可以产生的完整的执行计划数量总和
func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPlan, prop *property.PhysicalProperty, addEnforcer bool, planCounter *PlanCounterTp) (task, int64, error) {
var bestTask task = invalidTask
// curCntPlan 是对于 physicalPlans 中每一个 pp 所可以产生的 plan 数量
// cntPlan 是 physicalPlans 中所有 pp 所可以产生的 plan 数量总和
var curCntPlan, cntPlan int64
// childTasks 是对于 physicalPlans 中每一个 pp 的每个 child 各自的 bestTask
childTasks := make([]task, 0, len(p.children))
// childCnts 是对于 physicalPlans 中每一个 pp 的每个 child 各自可以找到多少种 task
childCnts := make([]int64, len(p.children))
cntPlan = 0
for _, pp := range physicalPlans {
// Find best child tasks firstly.
childTasks = childTasks[:0]
// The curCntPlan records the number of possible plans for pp
curCntPlan = 1
// ???
TimeStampNow := p.GetLogicalTS4TaskMap()
savedPlanID := p.ctx.GetSessionVars().PlanID
for j, child := range p.children {
// GetChildReqProps gets the required property by child index.
// 因为 planCounter 只用在最顶层 node 计数,
// 所以孩子 findBestTask 的时候传 PlanCounterDisabled(-1)
childTask, cnt, err := child.findBestTask(pp.GetChildReqProps(j), &PlanCounterDisabled)
childCnts[j] = cnt
if err != nil {
return nil, 0, err
}
// pp 的每个 child 可以找到 cnt 个 plan,那么相当于以 pp 为根,可以找到 cnt * cnt ... 个 plan
curCntPlan = curCntPlan * cnt
if childTask != nil && childTask.invalid() {
break
}
childTasks = append(childTasks, childTask)
}
// This check makes sure that there is no invalid child task.
if len(childTasks) != len(p.children) {
continue
}
// 如果启用了 planCounter,而且要找的第 planCounter 个 plan 就在这个 pp 中出现
// If the target plan can be found in this physicalPlan(pp), rebuild childTasks to build the corresponding combination.
if planCounter.IsForce() && int64(*planCounter) <= curCntPlan {
p.ctx.GetSessionVars().PlanID = savedPlanID
curCntPlan = int64(*planCounter)
// ???
err := p.rebuildChildTasks(&childTasks, pp, childCnts, int64(*planCounter), TimeStampNow)
if err != nil {
return nil, 0, err
}
}
// Combine best child tasks with parent physical plan.
// attach2Task makes the current physical plan as the father of task's physicalPlan and updates the cost of
// current task. If the child's task is cop task, some operator may close this task and return a new rootTask.
curTask := pp.attach2Task(childTasks...)
if curTask.invalid() {
continue
}
// ???
// An optimal task could not satisfy the property, so it should be converted here.
if _, ok := curTask.(*rootTask); !ok && prop.TaskTp == property.RootTaskType {
curTask = curTask.convertToRootTask(p.ctx)
}
// 在最顶上加一个 PhysicalSort
// Enforce curTask property
if addEnforcer {
curTask = enforceProperty(prop, curTask, p.basePlan.ctx)
}
// ???
// Optimize by shuffle executor to running in parallel manner.
if prop.IsEmpty() {
// Currently, we do not regard shuffled plan as a new plan.
curTask = optimizeByShuffle(curTask, p.basePlan.ctx)
}
cntPlan += curCntPlan
planCounter.Dec(curCntPlan)
if planCounter.Empty() {
bestTask = curTask
break
}
// Get the most efficient one.
if curTask.cost() < bestTask.cost() || (bestTask.invalid() && !curTask.invalid()) {
bestTask = curTask
}
}
return bestTask, cntPlan, nil
}
attach2Task
将 PhysicalPlan 变成 task,并且把 childs 的 tasks 接在自己身上
type PhysicalPlan interface {
Plan
// attach2Task makes the current physical plan as the father of task's physicalPlan and updates the cost of
// current task. If the child's task is cop task, some operator may close this task and return a new rootTask.
attach2Task(...task) task
}