TiDB 物理优化

physicalOptimize(WIP)

https://img-blog.csdnimg.cn/f33aa3ed0bbd4d0384ea7d394b32f2a2.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

func physicalOptimize(logic LogicalPlan, planCounter *PlanCounterTp) (PhysicalPlan, float64, error) {
// 递归的计算并保存每一个逻辑算子的统计信息(basePlan.stats)
	if _, err := logic.recursiveDeriveStats(nil); err != nil {
		return nil, 0, err
	}

// 引入预处理 property 函数的原因是为了减少一些没有必要考虑的 properties,
// 从而尽可能早的裁减掉成物理计划搜索路径上的分支
	preparePossibleProperties(logic)

// 在展开前,先引入 property.PhysicalProperty,这个概念很重要。
// property.PhysicalProperty 是对算子返回值数据的要求,
// 比如希望有些算子是按某些列有序的方式返回数据,那么会设置对应的列信息,
// 希望有些算子的类型是什么(root,cop),那么会设置对应的类型信息,
// 有些算子是没有要求的那么可以传空的 property.PhysicalProperty。
	prop := &property.PhysicalProperty{
		TaskTp:      property.RootTaskType,
		ExpectedCnt: math.MaxFloat64,
	}

	logic.SCtx().GetSessionVars().StmtCtx.TaskMapBakTS = 0
// findBestTask 将逻辑计划转换为物理计划。
// 从父节点到孩子节点递归调用以创建最终完整的物理计划。
// 一些逻辑计划会将孩子转换为不同的物理计划,并返回一个 cost 最低的物理计划
// 以及在这个函数中发现了多少计划
// planCounter 是 planner 用来强制选择一个执行计划的计数器:选择发现的第 planCounter 个 plan
	t, _, err := logic.findBestTask(prop, planCounter)
	if err != nil {
		return nil, 0, err
	}
	if *planCounter > 0 {
		logic.SCtx().GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("The parameter of nth_plan() is out of range."))
	}
	if t.invalid() {
		return nil, 0, ErrInternal.GenWithStackByArgs("Can't find a proper physical plan for this query")
	}

	err = t.plan().ResolveIndices()
	return t.plan(), t.cost(), err
}
type LogicalPlan interface {
	// recursiveDeriveStats derives statistic info between plans.
	recursiveDeriveStats(colGroups [][]*expression.Column) (*property.StatsInfo, error)
}

https://img-blog.csdnimg.cn/0d77de04329f4a42b0ebc94af64d27ec.png#pic_center

func (p *baseLogicalPlan) recursiveDeriveStats(colGroups [][]*expression.Column) (*property.StatsInfo, error) {
	childStats := make([]*property.StatsInfo, len(p.children))
	childSchema := make([]*expression.Schema, len(p.children))
// ???
	cumColGroups := p.self.ExtractColGroups(colGroups)
	for i, child := range p.children {
		childProfile, err := child.recursiveDeriveStats(cumColGroups)
		if err != nil {
			return nil, err
		}
		childStats[i] = childProfile
		childSchema[i] = child.Schema()
	}
	return p.self.DeriveStats(childStats, p.self.Schema(), childSchema, colGroups)
}
type LogicalPlan interface {
	// DeriveStats derives statistic info for current plan node given child stats.
	// We need selfSchema, childSchema here because it makes this method can be used in
	// cascades planner, where LogicalPlan might not record its children or schema.
	DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema, colGroups [][]*expression.Column) (*property.StatsInfo, error)
}

https://img-blog.csdnimg.cn/b699c550d32b439f94e3c01aee7ca5d9.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

引入预处理 property 函数的原因是为了减少一些没有必要考虑的 properties,从而尽可能早的裁减掉成物理计划搜索路径上的分支

// preparePossibleProperties traverses the plan tree by a post-order method,
// recursively calls LogicalPlan PreparePossibleProperties interface.
func preparePossibleProperties(lp LogicalPlan) [][]*expression.Column {
	childrenProperties := make([][][]*expression.Column, 0, len(lp.Children()))
// 先收集自己的每个孩子可以产生的 props
	for _, child := range lp.Children() {
		childrenProperties = append(childrenProperties, preparePossibleProperties(child))
	}
// lp 自己可以利用的就只有 childrenProperties 这些了,
// lp.PreparePossibleProperties 会保留可以利用 childrenProperties 的物理计划
// 并返回 lp 可以产生的 props
	return lp.PreparePossibleProperties(lp.Schema(), childrenProperties...)
}

PreparePossibleProperties 仅用于连接和聚合。拿 group by a,b,c 为例子,LogicalAggregration 可以要求 (a,b,c) 有序,或 (b,a,c) 有序,或 (b,c,a) 有序等等,但 DataSource 中的有序索引是有限的,比如只有一个 (a,b,c) 的索引,那么只能 (a,b,c) 有序。那么可以通过 PreparePossibleProperties 来去掉不可能其余不可能的 prop

可以根据 childrenProperties 来进行预减枝的 LogicalPlan ,需要实现 PreparePossibleProperties,未实现的会继承 *baseLogicalPlan.PreparePossibleProperties

type LogicalPlan interface {

	// PreparePossibleProperties is only used for join and aggregation. Like group by a,b,c, all permutation of (a,b,c) is
	// valid, but the ordered indices in leaf plan is limited. So we can get all possible order properties by a pre-walking.
	PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column
	
}

// PreparePossibleProperties implements LogicalPlan PreparePossibleProperties interface.
func (p *baseLogicalPlan) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
	return nil
}

https://img-blog.csdnimg.cn/9bb5e140f9e3467f9638e71bac0d7378.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

func (p *LogicalSelection) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
// 直接他孩子能保证谁有序,那他就能保证谁有序,selection 只有一个孩子
	return childrenProperties[0]
}
// PreparePossibleProperties implements LogicalPlan PreparePossibleProperties interface.
func (ds *DataSource) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
	result := make([][]*expression.Column, 0, len(ds.possibleAccessPaths))

// 遍历所有可以的读表方式
	for _, path := range ds.possibleAccessPaths {
// 如果这个 path 是走单 int 主键索引的,那么可以保证主键有序
		if path.IsIntHandlePath {
			col := ds.getPKIsHandleCol()
			if col != nil {
				result = append(result, []*expression.Column{col})
			}
			continue
		}

		if len(path.IdxCols) == 0 {
			continue
		}
// 如果这个 path 的 len(path.IdxCols) != 0, 说明走的是
// 单列非 int 主键索引,或多列主键索引,或二级索引
// 例如有 path.IdxCols = [a, b, c] 三个列,那么首先可以保证(a, b, c)有序
		result = append(result, make([]*expression.Column, len(path.IdxCols)))
		copy(result[len(result)-1], path.IdxCols)
// ??? See https://github.com/pingcap/tidb/pull/10548
		for i := 0; i < path.EqCondCount && i+1 < len(path.IdxCols); i++ {
			result = append(result, make([]*expression.Column, len(path.IdxCols)-i-1))
			copy(result[len(result)-1], path.IdxCols[i+1:])
		}
	}
	return result
}
// PreparePossibleProperties implements LogicalPlan PreparePossibleProperties interface.
func (la *LogicalAggregation) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
// LogicalAggregation 应该就只有一个 child
	childProps := childrenProperties[0]
	// If there's no group-by item, the stream aggregation could have no order property. So we can add an empty property
	// when its group-by item is empty.
	if len(la.GroupByItems) == 0 {
		la.possibleProperties = [][]*expression.Column{nil}
		return nil
	}
	resultProperties := make([][]*expression.Column, 0, len(childProps))
	clonedProperties := make([][]*expression.Column, 0, len(childProps))
	// GetGroupByCols returns the columns that are group-by items.
	// For example, `group by a, b, c+d` will return [a, b].
	groupByCols := la.GetGroupByCols()
	for _, possibleChildProperty := range childProps {
		sortColOffsets := getMaxSortPrefix(possibleChildProperty, groupByCols)
// 如果 possibleChildProperty 的前 len(groupByCols) 列恰好是包括了
// 所有的 groupByCols 列,那么就可以利用 possibleChildProperty 
// 的前 len(groupByCols) 列有序这一 prop 来做 stream agg
		if len(sortColOffsets) == len(groupByCols) {
			prop := possibleChildProperty[:len(groupByCols)]
			resultProperties = append(resultProperties, prop)
// 要 clone 一下,see https://github.com/pingcap/tidb/pull/24204
			// Clone the Columns in the property before saving them, otherwise the upper Projection may
			// modify them and lead to unexpected results.
			clonedProp := make([]*expression.Column, len(prop))
			for i, col := range prop {
				clonedProp[i] = col.Clone().(*expression.Column)
			}
			clonedProperties = append(clonedProperties, clonedProp)
		}
	}
// 可以利用的 props 保存在 la.possibleProperties
	la.possibleProperties = clonedProperties
	return resultProperties
}
// PreparePossibleProperties implements LogicalPlan PreparePossibleProperties interface.
func (p *LogicalJoin) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
	leftProperties := childrenProperties[0]
	rightProperties := childrenProperties[1]
	// TODO: We should consider properties propagation.
// 要 clone 一下,see https://github.com/pingcap/tidb/pull/24204
	// Clone the Columns in the property before saving them, otherwise the upper Projection may
	// modify them and lead to unexpected results.
// p.leftProperties 就是左表可以产生的有序的 prop,保存在这里,在 GetMergeJoin 中使用
	p.leftProperties = clonePossibleProperties(leftProperties)
// p.rightProperties 就是右表可以产生的有序的 prop,保存在这里,在 GetMergeJoin 中使用
	p.rightProperties = clonePossibleProperties(rightProperties)
// 如果是左连接,那么就可以忽略掉右表的顺序,因为连接之后右表可能有 null
	if p.JoinType == LeftOuterJoin || p.JoinType == LeftOuterSemiJoin {
		rightProperties = nil
// 如果是右连接,那么就可以忽略掉左表的顺序,因为连接之后左表可能有 null
	} else if p.JoinType == RightOuterJoin {
		leftProperties = nil
	}
// LogicalJoin 可以产生的顺序 props 即是左表可以产生的顺序 + 右表可以产生的顺序
// 即 join 后的结果可以按照左表可以产生的顺序中的某一个顺序有序,
// 或按照右表可以产生的顺序中的某一个顺序有序
	resultProperties := make([][]*expression.Column, len(leftProperties)+len(rightProperties))
	for i, cols := range leftProperties {
		resultProperties[i] = make([]*expression.Column, len(cols))
		copy(resultProperties[i], cols)
	}
	leftLen := len(leftProperties)
	for i, cols := range rightProperties {
		resultProperties[leftLen+i] = make([]*expression.Column, len(cols))
		copy(resultProperties[leftLen+i], cols)
	}
	return resultProperties
}
func (p *LogicalProjection) PreparePossibleProperties(schema *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
	childProperties := childrenProperties[0]
// 取单纯映射(不涉及运算)前的 col 作为 oldCols,单纯映射(不涉及运算)后的 col 作为 newCols
	oldCols := make([]*expression.Column, 0, p.schema.Len())
	newCols := make([]*expression.Column, 0, p.schema.Len())
	for i, expr := range p.Exprs {
		if col, ok := expr.(*expression.Column); ok {
// 如果 col 是一个 expr.(*expression.Column),那么说明 col 就是一个纯映射,而不是运算
			newCols = append(newCols, p.schema.Columns[i])
			oldCols = append(oldCols, col)
		}
	}
	tmpSchema := expression.NewSchema(oldCols...)
	newProperties := make([][]*expression.Column, 0, len(childProperties))
	for _, childProperty := range childProperties {
		newChildProperty := make([]*expression.Column, 0, len(childProperty))
		for _, col := range childProperty {
			pos := tmpSchema.ColumnIndex(col)
// col (a, b, c) 在 LogicalProjection 是一个纯映射,那么可以保证映射后的 col 也有序
			if pos >= 0 {
				newChildProperty = append(newChildProperty, newCols[pos])
			} else {
// col (d) 在 LogicalProjection 不是一个纯映射,那么不能保证映射后的 col 也有序
// 而且只能保证前面 cols (a, b, c) 有序了
				break
			}
		}
		if len(newChildProperty) != 0 {
			newProperties = append(newProperties, newChildProperty)
		}
	}
// ??? 为啥不像 LogicalJoin 或 LogicalAggregation 一样把 newProperties 在本节点存一下呢?
	return newProperties
}
type LogicalPlan interface {
	// findBestTask converts the logical plan to the physical plan. It's a new interface.
	// It is called recursively from the parent to the children to create the result physical plan.
	// Some logical plans will convert the children to the physical plans in different ways, and return the one
	// With the lowest cost and how many plans are found in this function.
	// planCounter is a counter for planner to force a plan.
	// If planCounter > 0, the clock_th plan generated in this function will be returned.
	// If planCounter = 0, the plan generated in this function will not be considered.
	// If planCounter = -1, then we will not force plan.
	findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp) (task, int64, error)
}

https://img-blog.csdnimg.cn/0af49b27cff64d1c90c27e341dbffe64.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

// PhysicalProperty stands for the required physical property by parents.
// It contains the orders and the task types.
type PhysicalProperty struct {
	// SortItems contains the required sort attributes.
	SortItems []SortItem

	// TaskTp means the type of task that an operator requires.
	//
	// It needs to be specified because two different tasks can't be compared
	// with cost directly. e.g. If a copTask takes less cost than a rootTask,
	// we can't sure that we must choose the former one. Because the copTask
	// must be finished and increase its cost in sometime, but we can't make
	// sure the finishing time. So the best way to let the comparison fair is
	// to add TaskType to required property.
	TaskTp TaskType

	// ExpectedCnt means this operator may be closed after fetching ExpectedCnt
	// records.
	ExpectedCnt float64

	// hashcode stores the hash code of a PhysicalProperty, will be lazily
	// calculated when function "HashCode()" being called.
	hashcode []byte

	...
}

// SortItem wraps the column and its order.
type SortItem struct {
	Col  *expression.Column
	Desc bool
}

const (
	// RootTaskType stands for the tasks that executed in the TiDB layer.
	RootTaskType TaskType = iota

	// CopSingleReadTaskType stands for the a TableScan or IndexScan tasks
	// executed in the coprocessor layer.
	CopSingleReadTaskType

	// CopDoubleReadTaskType stands for the a IndexLookup tasks executed in the
	// coprocessor layer.
	CopDoubleReadTaskType

	...
)
// findBestTask implements LogicalPlan interface.
func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp) (bestTask task, cntPlan int64, err error) {
	// If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
	// and set inner child prop nil, so here we do nothing.
	if prop == nil {
		return nil, 1, nil
	}
	// Look up the task with this prop in the task map.
	// It's used to reduce double counting.
// 使用记忆化搜索,每当一个算子收到一个 prop ,来 findBestTask 
// 会计算 prop 的 hash 值和根据 prop 找到的 bestTask 并存储到 
// baseLogicalPlan.taskMap map[string]task
// 如果这个算子再收到同样的 prop,则可以直接返回上次找到的 bestTask
	bestTask = p.getTask(prop)
	if bestTask != nil {
		planCounter.Dec(1)
		return bestTask, 1, nil
	}

// 能否在 p.self 上面加一个 LogicalSort 来强制满足 prop
	canAddEnforcer := prop.CanAddEnforcer
// ???
	if prop.TaskTp != property.RootTaskType && !prop.IsFlashProp() {
		// Currently all plan cannot totally push down to TiKV.
		p.storeTask(prop, invalidTask)
		return invalidTask, 0, nil
	}

	bestTask = invalidTask
	cntPlan = 0
	// prop should be read only because its cached hashcode might be not consistent
	// when it is changed. So we clone a new one for the temporary changes.
	newProp := prop.CloneEssentialFields()
// plansFitsProp 代表不额外加 LogicalSort,且可以满足 prop 的 PhysicalPlans
// plansNeedEnforce 代表额外加 LogicalSort,且可以满足 prop 的 PhysicalPlans
	var plansFitsProp, plansNeedEnforce []PhysicalPlan
	var hintWorksWithProp bool
	// Maybe the plan can satisfy the required property,
	// so we try to get the task without the enforced sort first.
// 找到 p.self 这个逻辑算子,对应可以满足 newProp 的所有物理算子
	plansFitsProp, hintWorksWithProp, err = p.self.exhaustPhysicalPlans(newProp)
	if err != nil {
		return nil, 0, err
	}
// 如果 sql 里有 hint,
// 但是对于 plansFitsProp 中的所有 PhysicalPlans, hint 都不能 work(newProp 无法让 hint work)
// newProp 必须是非空的(存在要求按顺序的列)才有必要额外加 LogicalSort
	if !hintWorksWithProp && !newProp.IsEmpty() {
		// If there is a hint in the plan and the hint cannot satisfy the property,
		// we enforce this property and try to generate the PhysicalPlan again to
		// make sure the hint can work.
		canAddEnforcer = true
	}

	if canAddEnforcer {
		// Then, we use the empty property to get physicalPlans and
		// try to get the task with an enforced sort.
// 搞一个空的 newProp 来 p.self.exhaustPhysicalPlans
// 因为要额外加一个 LogicalSort,所以原 prop 由 LogicalSort 来满足
		newProp.SortItems = []property.SortItem{}
		newProp.ExpectedCnt = math.MaxFloat64
		newProp.MPPPartitionCols = nil
		newProp.MPPPartitionTp = property.AnyType
		var hintCanWork bool
// 找到 p.self 这个逻辑算子,对应可以符合 newProp 的所有物理算子
		plansNeedEnforce, hintCanWork, err = p.self.exhaustPhysicalPlans(newProp)
		if err != nil {
			return nil, 0, err
		}

		if hintCanWork && !hintWorksWithProp {
			// If the hint can work with the empty property, but cannot work with
			// the required property, we give up `plansFitProp` to make sure the hint
			// can work.
			plansFitsProp = nil
		}

		if !hintCanWork && !hintWorksWithProp && !prop.CanAddEnforcer {
			// If the original property is not enforced and hint cannot
			// work anyway, we give up `plansNeedEnforce` for efficiency,
			plansNeedEnforce = nil
		}
		newProp = prop
	}

	var cnt int64
	var curTask task
	if bestTask, cnt, err = p.enumeratePhysicalPlans4Task(plansFitsProp, newProp, false, planCounter); err != nil {
		return nil, 0, err
	}
	cntPlan += cnt
	if planCounter.Empty() {
		goto END
	}

	curTask, cnt, err = p.enumeratePhysicalPlans4Task(plansNeedEnforce, newProp, true, planCounter)
	if err != nil {
		return nil, 0, err
	}
	cntPlan += cnt
	if planCounter.Empty() {
		bestTask = curTask
		goto END
	}
	if curTask.cost() < bestTask.cost() || (bestTask.invalid() && !curTask.invalid()) {
		bestTask = curTask
	}

END:
// 使用记忆化搜索,每当一个算子收到一个 prop ,来 findBestTask 
// 会计算 prop 的 hash 值和根据 prop 找到的 bestTask 并存储到 
// baseLogicalPlan.taskMap map[string]task
// 如果这个算子再收到同样的 prop,则可以直接返回上次找到的 bestTask
	p.storeTask(prop, bestTask)
	return bestTask, cntPlan, nil
}

找到 LogicalPlan 这个逻辑算子,对应可以符合 *property.PhysicalProperty 的所有物理算子

type LogicalPlan interface {
	// exhaustPhysicalPlans generates all possible plans that can match the required property.
	// It will return:
	// 1. All possible plans that can match the required property.
	// 2. Whether the SQL hint can work. Return true if there is no hint.
	exhaustPhysicalPlans(*property.PhysicalProperty) (physicalPlans []PhysicalPlan, hintCanWork bool, err error)
}

https://img-blog.csdnimg.cn/97e18017eca042a9ba1514712ef160e8.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {

// TryToGetChildProp will check if this sort property can be pushed or not.
// When a sort column will be replaced by scalar function, we refuse it.
// When a sort column will be replaced by a constant, we just remove it.
// 将 prop 中的 cols 对应为映射前的 cols
// 如果 prop 中的存在某个 col 对应映射前的 col 是一个 *expression.ScalarFunction 而不是 ×expression.Coloumn
// 那么说明 LogicalProjection 无法接受这个 prop
// 因为无法保证映射前的 *expression.ScalarFunction 是有序的
	newProp, ok := p.TryToGetChildProp(prop)
	if !ok {
		return nil, true, nil
	}
	newProps := []*property.PhysicalProperty{newProp}
	
	// generate a mpp task candidate if enforced mpp
	...
	
	ret := make([]PhysicalPlan, 0, len(newProps))
	for _, newProp := range newProps {
		proj := PhysicalProjection{
			Exprs:                p.Exprs,
			CalculateNoDelay:     p.CalculateNoDelay,
			AvoidColumnEvaluator: p.AvoidColumnEvaluator,
		}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, newProp)
		proj.SetSchema(p.schema)
		ret = append(ret, proj)
	}
	return ret, true, nil
}

// Init initializes PhysicalProjection.
func (p PhysicalProjection) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalProjection {
	p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeProj, &p, offset)
// PhysicalProjection 的 child 所需要接受的 prop,就是 LogicalProjection 自己收到的 prop
// 经过上面 TryToGetChildProp 转换为 p.Exprs 中对应的的映射前的 newProp
	p.childrenReqProps = props
	p.stats = stats
	return &p
}	
func (p *LogicalSelection) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
	childProp := prop.CloneEssentialFields()
	sel := PhysicalSelection{
		Conditions: p.Conditions,
	}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, childProp)
	return []PhysicalPlan{sel}, true, nil
}

func (p PhysicalSelection) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalSelection {
	p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSel, &p, offset)
// PhysicalSelection 的 child 所需要接受的 prop,就是 LogicalSelection 自己收到的 prop
	p.childrenReqProps = props
	p.stats = stats
	return &p
}
func (la *LogicalAggregation) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
// sql hint 将 agg 下推到 coprocessor
// https://docs.pingcap.com/zh/tidb/dev/optimizer-hints#agg_to_cop
	if la.aggHints.preferAggToCop {
// 无法将 agg 下推到 coprocessor,报个 warning
// 并将 la.aggHints.preferAggToCop 设为 false
		if !la.canPushToCop(kv.TiKV) {
			errMsg := "Optimizer Hint AGG_TO_COP is inapplicable"
			warning := ErrInternal.GenWithStack(errMsg)
			la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
			la.aggHints.preferAggToCop = false
		}
	}

// 检查 la.aggHints.preferAggType 来识别 sql hint 所要求的 agg type
// preferHash 为 true 或 preferStream 为 true
// 如果 sql hint 既要求 hash join,又要求 stream agg,说明有 conflict,或者不存在 sql hint 时
// preferHash, preferStream 都为 false
	preferHash, preferStream := la.ResetHintIfConflicted()

	hashAggs := la.getHashAggs(prop)
	if hashAggs != nil && preferHash {
		return hashAggs, true, nil
	}

	streamAggs := la.getStreamAggs(prop)
	if streamAggs != nil && preferStream {
		return streamAggs, true, nil
	}

// sql hint 没有 prefer 的 agg type,或者 prefer 的 agg type 没法 work
	aggs := append(hashAggs, streamAggs...)

	if streamAggs == nil && preferStream && !prop.IsEmpty() {
		errMsg := "Optimizer Hint STREAM_AGG is inapplicable"
		warning := ErrInternal.GenWithStack(errMsg)
		la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
	}

	return aggs, !(preferStream || preferHash), nil
}

// canPushToCop check if we might push this plan to a specific store.
func (la *LogicalAggregation) canPushToCop(storeTp kv.StoreType) bool {
// canPushToCop checks if it can be pushed to some stores. For TiKV, it only checks datasource.
// For TiFlash, it will check whether the operator is supported, but note that the check might be inaccrute.

// LogicalAggregation.noCopPushDown indicates if planner must not push this agg down to coprocessor.
// It is true when the agg is in the outer child tree of apply.
	return la.baseLogicalPlan.canPushToCop(storeTp) && !la.noCopPushDown
}
func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []PhysicalPlan {
// hash agg 不能满足某列有序
	if !prop.IsEmpty() {
		return nil
	}
	if prop.TaskTp == property.MppTaskType && !la.checkCanPushDownToMPP() {
		return nil
	}
	// GetAllPossibleChildTaskTypes enumrates the possible types of tasks for children.
	// CopSingleReadTaskType, CopDoubleReadTaskType, RootTaskType
	hashAggs := make([]PhysicalPlan, 0, len(prop.GetAllPossibleChildTaskTypes()))

// taskTypes 可以是 property.CopSingleReadTaskType 或 property.CopDoubleReadTaskType
	taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType}
// ??? MPP, broadcast join 相关
	if la.ctx.GetSessionVars().AllowBCJ {
		taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
	}
	
	canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
	canPushDownToMPP := canPushDownToTiFlash && la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP()
	
	if la.HasDistinct() {
		// TODO: remove after the cost estimation of distinct pushdown is implemented.
		if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
			if la.ctx.GetSessionVars().StmtCtx.InExplainStmt {
				la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in non-mpp mode because it contains agg function with distinct"))
			}
// 如果 LogicalAggregation has functions with distinct 
// 且 !la.ctx.GetSessionVars().AllowDistinctAggPushDown
// 那么 taskTypes 只能是 property.RootTaskType
			taskTypes = []property.TaskType{property.RootTaskType}
		}
	} else if !la.aggHints.preferAggToCop {
// 如果 sql hints not prefer agg to cop
// 那么 taskTypes 可以是 
// property.CopSingleReadTaskType 或 property.CopDoubleReadTaskType 或 property.RootTaskType
		taskTypes = append(taskTypes, property.RootTaskType)
	}
	if !la.canPushToCop(kv.TiKV) {
// 如果 agg 无法推到 cop
// 那么 taskTypes 只能是 property.RootTaskType
		taskTypes = []property.TaskType{property.RootTaskType}
		
		if canPushDownToTiFlash {
			taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
		}	
	}
	
	if canPushDownToMPP {
		taskTypes = append(taskTypes, property.MppTaskType)
	}
	if prop.IsFlashProp() {
		taskTypes = []property.TaskType{prop.TaskTp}
	}

// 遍历所有可能的 taskTypes,为每一种 taskType 生成一个 PhysicalHashAgg
	for _, taskTp := range taskTypes {
		if taskTp == property.MppTaskType {
			mppAggs := la.tryToGetMppHashAggs(prop)
			if len(mppAggs) > 0 {
				hashAggs = append(hashAggs, mppAggs...)
			}
		} else {
			agg := NewPhysicalHashAgg(la, 
			la.stats.ScaleByExpectCnt(prop.ExpectedCnt), 
// PhysicalHashAgg 的 PhysicalProperty 中的 TaskTp 为 taskTp
// 中的 SortItems 为 nil
			&property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp})
			agg.SetSchema(la.schema.Clone())
			hashAggs = append(hashAggs, agg)
		}
	}
	return hashAggs
}

// NewPhysicalHashAgg creates a new PhysicalHashAgg from a LogicalAggregation.
func NewPhysicalHashAgg(la *LogicalAggregation, newStats *property.StatsInfo, prop *property.PhysicalProperty) *PhysicalHashAgg {
	agg := basePhysicalAgg{
		GroupByItems: la.GroupByItems,
		AggFuncs:     la.AggFuncs,
	}.initForHash(la.ctx, newStats, la.blockOffset, prop)
	return agg
}

func (base basePhysicalAgg) initForHash(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalHashAgg {
	p := &PhysicalHashAgg{base}
	p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeHashAgg, p, offset)
	p.childrenReqProps = props
	p.stats = stats
	return p
}
func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
	// TODO: support CopTiFlash task type in stream agg
	if prop.IsFlashProp() {
		return nil
	}
	
	// AllSameOrder checks if all the prop.SortItems have same order (desc or asc).
	all, desc := prop.AllSameOrder()
	// 如果不全是升序或全是降序,stream agg 是无法满足 prop 的
	if !all {
		return nil
	}

// ???
	for _, aggFunc := range la.AggFuncs {
		if aggFunc.Mode == aggregation.FinalMode {
			return nil
		}
	}
	
	// GetGroupByCols returns the columns that are group-by items.
	// For example, `group by a, b, c+d` will return [a, b].
	groupByCols := la.GetGroupByCols()
	if len(groupByCols) != len(la.GroupByItems) {
// 如果存在不是纯列 group by 的情况,那么不能使用 stream agg
		return nil
	}

	// GetAllPossibleChildTaskTypes enumrates the possible types of tasks for children.
	// CopSingleReadTaskType, CopDoubleReadTaskType, RootTaskType
	allTaskTypes := prop.GetAllPossibleChildTaskTypes()
	streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(allTaskTypes)-1)+len(allTaskTypes))
// ???
	childProp := &property.PhysicalProperty{
		ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt),
	}

// la.possibleProperties 是在之前在 preparePossibleProperties 中保存的
// LogicalAggregation 可以保证有序的且包含了 groupByCols 中所有列的,列的有顺序的组合
// 遍历每一种 possibleChildProperty
	for _, possibleChildProperty := range la.possibleProperties {
// 取 possibleChildProperty[:len(groupByCols)] 作为 childProp.SortItems
// physical stream agg 就会按照 childProp.SortItems 的列的顺序做 stream agg
// 也就是可以保证 childProp.SortItems 的列有序
		childProp.SortItems = property.SortItemsFromCols(possibleChildProperty[:len(groupByCols)], desc)
		// IsPrefix checks whether the order property is the prefix of another.
		if !prop.IsPrefix(childProp) {
// prop 是 LogicalAggregation 的父算子要求 LogicalAggregation 应该满足的 prop
// 如果 prop.SortItems 不是 childProp.SortItems 的前缀,那么说明无法满足 prop
			continue
		}
// ???
		// The table read of "CopDoubleReadTaskType" can't promises the sort
		// property that the stream aggregation required, no need to consider.
		taskTypes := []property.TaskType{property.CopSingleReadTaskType}

		if la.HasDistinct() {
			// TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented.
			// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
			if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
// 如果 LogicalAggregation has functions with distinct 
// 且 !la.ctx.GetSessionVars().AllowDistinctAggPushDown
// 那么 taskTypes 只能是 property.RootTaskType
				taskTypes = []property.TaskType{property.RootTaskType}
// ???
			} else if !la.distinctArgsMeetsProperty() {
				continue
			}
		} else if !la.aggHints.preferAggToCop {
// 如果 sql hints not prefer agg to cop
// 那么 taskTypes 可以是 
// property.CopSingleReadTaskType 或 property.RootTaskType
			taskTypes = append(taskTypes, property.RootTaskType)
		}
		if !la.canPushToCop(kv.TiKV) {
// 如果 agg 无法推到 cop
// 那么 taskTypes 只能是 property.RootTaskType
			taskTypes = []property.TaskType{property.RootTaskType}
		}
		for _, taskTp := range taskTypes {
			copiedChildProperty := new(property.PhysicalProperty)
			*copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field.
			copiedChildProperty.TaskTp = taskTp

			agg := basePhysicalAgg{
				GroupByItems: la.GroupByItems,
				AggFuncs:     la.AggFuncs,
			}.initForStream(la.ctx, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), la.blockOffset, copiedChildProperty)
			agg.SetSchema(la.schema.Clone())
			streamAggs = append(streamAggs, agg)
		}
	}
	
	// If STREAM_AGG hint is existed, it should consider enforce stream aggregation,
	// because we can't trust possibleChildProperty completely.
	if (la.aggHints.preferAggType & preferStreamAgg) > 0 {
// ???
		streamAggs = append(streamAggs, la.getEnforcedStreamAggs(prop)...)
	}
	return streamAggs
}

func (base basePhysicalAgg) initForStream(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalStreamAgg {
	p := &PhysicalStreamAgg{base}
	p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeStreamAgg, p, offset)
	p.childrenReqProps = props
	p.stats = stats
	return p
}
// LogicalJoin can generates hash join, index join and sort merge join.
// Firstly we check the hint, if hint is figured by user, we force to choose the corresponding physical plan.
// If the hint is not matched, it will get other candidates.
// If the hint is not figured, we will pick all candidates.
func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {

	failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
		if val.(bool) {
			indexJoins, _ := p.tryToGetIndexJoin(prop)
			failpoint.Return(indexJoins, true, nil)
		}
	})

// ??? MPP, broadcast join 相关
	if prop.MPPPartitionTp == property.BroadcastType {
		return nil, false, nil
	}
	joins := make([]PhysicalPlan, 0, 8)
	canPushToTiFlash := p.canPushToCop(kv.TiFlash)
	if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash {
		...
	} else if p.ctx.GetSessionVars().AllowBCJ && canPushToTiFlash {
		...
	}
	if prop.IsFlashProp() {
		return joins, true, nil
	}

// merge join
	mergeJoins := p.GetMergeJoin(prop, p.schema, p.Stats(), p.children[0].statsInfo(), p.children[1].statsInfo())
	if (p.preferJoinType&preferMergeJoin) > 0 && len(mergeJoins) > 0 {
		return mergeJoins, true, nil
	}
	joins = append(joins, mergeJoins...)

// index join
	indexJoins, forced := p.tryToGetIndexJoin(prop)
	if forced {
		return indexJoins, true, nil
	}
	joins = append(joins, indexJoins...)

// hash join
	hashJoins := p.getHashJoins(prop)
	if (p.preferJoinType&preferHashJoin) > 0 && len(hashJoins) > 0 {
		return hashJoins, true, nil
	}
	joins = append(joins, hashJoins...)

	if p.preferJoinType > 0 {
		// If we reach here, it means we have a hint that doesn't work.
		// It might be affected by the required property, so we enforce
		// this property and try the hint again.
		return joins, false, nil
	}
	return joins, true, nil
}
// GetMergeJoin convert the logical join to physical merge join based on the physical property.
func (p *LogicalJoin) GetMergeJoin(prop *property.PhysicalProperty, schema *expression.Schema, statsInfo *property.StatsInfo, leftStatsInfo *property.StatsInfo, rightStatsInfo *property.StatsInfo) []PhysicalPlan {
	joins := make([]PhysicalPlan, 0, len(p.leftProperties)+1)
	
	// GetJoinKeys extracts join keys(columns) from EqualConditions. It returns left join keys, right
	// join keys and an `isNullEQ` array which means the `joinKey[i]` is a `NullEQ` function. The `hasNullEQ`
	// means whether there is a `NullEQ` of a join key.
// ??? NullEQ
	leftJoinKeys, rightJoinKeys, isNullEQ, hasNullEQ := p.GetJoinKeys()

	// EnumType/SetType Unsupported: merge join conflicts with index order.
	// ref: https://github.com/pingcap/tidb/issues/24473, https://github.com/pingcap/tidb/issues/25669
	...
	
	// TODO: support null equal join keys for merge join
	if hasNullEQ {
		return nil
	}
	
	// The leftProperties caches all the possible properties that are provided by its children.
	for _, lhsChildProperty := range p.leftProperties {
// 假设 lhsChildProperty 中第 n + 1 个 col 在 leftJoinKeys没有出现
// 返回 lhsChildProperty 中前 n 个 cols 在 leftJoinKeys 中的 offset
		offsets := getMaxSortPrefix(lhsChildProperty, leftJoinKeys)
		// If not all equal conditions hit properties. We ban merge join heuristically. Because in this case, merge join
		// may get a very low performance. In executor, executes join results before other conditions filter it.
		if len(offsets) < len(leftJoinKeys) {
// 前提:lhsChildProperty 中没有重复元素
// 如果 lhsChildProperty 的前 len(leftJoinKeys) 个元素没有都在 leftJoinKeys 中出现
// 说明该 lhsChildProperty 不能保证 leftJoinKeys 有序
			continue
		}

// 走到这里说明 lhsChildProperty 可以保证 
// leftJoinKeys 中的 cols 按照 lhsChildProperty[:len(offsets)] 排列,有序
		leftKeys := lhsChildProperty[:len(offsets)]
// 将 rightJoinKeys 按照 leftKeys 排列,使得 leftKeys 和 rightKeys 中的元素一一对应
		rightKeys := expression.NewSchema(rightJoinKeys...).ColumnsByIndices(offsets)
		newIsNullEQ := make([]bool, 0, len(offsets))
		for _, offset := range offsets {
			newIsNullEQ = append(newIsNullEQ, isNullEQ[offset])
		}

// 遍历 p.rightProperties 中的每个 prop,每个 prop 对 rightKeys 取公共前缀
// 返回最长的公共前缀长度
// ??? prefixLen 小于 len(rightJoinKeys) 咋办,这样不就不能保证 rightJoinKeys 有序了?
		prefixLen := findMaxPrefixLen(p.rightProperties, rightKeys)
		if prefixLen == 0 {
			continue
		}

		leftKeys = leftKeys[:prefixLen]
		rightKeys = rightKeys[:prefixLen]
		newIsNullEQ = newIsNullEQ[:prefixLen]
// ???
		if !p.checkJoinKeyCollation(leftKeys, rightKeys) {
			continue
		}
		offsets = offsets[:prefixLen]
		baseJoin := basePhysicalJoin{
			JoinType:        p.JoinType,
			LeftConditions:  p.LeftConditions,
			RightConditions: p.RightConditions,
			DefaultValues:   p.DefaultValues,
			LeftJoinKeys:    leftKeys,
			RightJoinKeys:   rightKeys,
			IsNullEQ:        newIsNullEQ,
		}
		mergeJoin := PhysicalMergeJoin{basePhysicalJoin: baseJoin}.Init(p.ctx, statsInfo.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset)
		mergeJoin.SetSchema(schema)
// 因为 prefixLen 可能小于 len(rightJoinKeys) 嘛,
// 所以把 p.EqualConditions 中没有在 leftKeys,rightKeys 中出现的 exprs
// 和 p.OtherConditions 一起存放在 mergeJoin.OtherConditions
		mergeJoin.OtherConditions = p.moveEqualToOtherConditions(offsets)
// ???
		mergeJoin.initCompareFuncs()
// 只有当 prop 是 mergeJoin.LeftJoinKeys 或 mergeJoin.RightJoinKeys 的前缀的时候
// 父算子所要求的 prop 才被可以满足,
// 因为 mergeJoin 会满足 mergeJoin.LeftJoinKeys 或 mergeJoin.RightJoinKeys 有序
		// Only if the input required prop is the prefix of join keys, we can pass through this property.
		if reqProps, ok := mergeJoin.tryToGetChildReqProp(prop); ok {
			// Adjust expected count for children nodes.
// ???
			if prop.ExpectedCnt < statsInfo.RowCount {
				expCntScale := prop.ExpectedCnt / statsInfo.RowCount
				reqProps[0].ExpectedCnt = leftStatsInfo.RowCount * expCntScale
				reqProps[1].ExpectedCnt = rightStatsInfo.RowCount * expCntScale
			}
// 设置要求 mergejoin 的两个孩子满足的 props
// 即为 mergeJoin.LeftJoinKeys 和 mergeJoin.RightJoinKeys
			mergeJoin.childrenReqProps = reqProps
			_, desc := prop.AllSameOrder()
			mergeJoin.Desc = desc
			joins = append(joins, mergeJoin)
		}
	}
// ???
	// If TiDB_SMJ hint is existed, it should consider enforce merge join,
	// because we can't trust lhsChildProperty completely.
	if (p.preferJoinType & preferMergeJoin) > 0 {
		joins = append(joins, p.getEnforcedMergeJoin(prop, schema, statsInfo)...)
	}

	return joins
}

// Only if the input required prop is the prefix fo join keys, we can pass through this property.
func (p *PhysicalMergeJoin) tryToGetChildReqProp(prop *property.PhysicalProperty) ([]*property.PhysicalProperty, bool) {
	all, desc := prop.AllSameOrder()
	lProp := property.NewPhysicalProperty(property.RootTaskType, p.LeftJoinKeys, desc, math.MaxFloat64, false)
	rProp := property.NewPhysicalProperty(property.RootTaskType, p.RightJoinKeys, desc, math.MaxFloat64, false)
	if !prop.IsEmpty() {
		// sort merge join fits the cases of massive ordered data, so desc scan is always expensive.
		if !all {
			return nil, false
		}
		if !prop.IsPrefix(lProp) && !prop.IsPrefix(rProp) {
// prop 即不是 lprop 的前缀,也不是 rprop 的前缀,无法满足 prop
			return nil, false
		}
		if prop.IsPrefix(rProp) && p.JoinType == LeftOuterJoin {
// 左连接,左表是外表,merge join 会保证左表 join keys 有序,
// 若 prop 不是 lprop 的前缀,那么无法满足 prop
			return nil, false
		}
		if prop.IsPrefix(lProp) && p.JoinType == RightOuterJoin {
// 右连接,右表是外表,merge join 会保证右表 join keys 有序,
// 若 prop 不是 rprop 的前缀,那么无法满足 prop
			return nil, false
		}
// ??? 半连接咋办
	}

	return []*property.PhysicalProperty{lProp, rProp}, true
}
func (p *LogicalJoin) getHashJoins(prop *property.PhysicalProperty) []PhysicalPlan {
// hash join 不能满足某列有序
	if !prop.IsEmpty() { // hash join doesn't promise any orders
		return nil
	}
	joins := make([]PhysicalPlan, 0, 2)
	switch p.JoinType {
	case SemiJoin, AntiSemiJoin, LeftOuterSemiJoin, AntiLeftOuterSemiJoin:
// 半连接,右表是内表,左表是外表,
// 只允许 build 内表,probe 外表
		joins = append(joins, p.getHashJoin(prop, 1, false))
	case LeftOuterJoin:
// 左连接,右表是内表,左表是外表,
// 允许 build 内表,probe 外表
// 如果外表比较小,允许 build 外表,probe 内表
		if ForceUseOuterBuild4Test {
			joins = append(joins, p.getHashJoin(prop, 1, true))
		} else {
			joins = append(joins, p.getHashJoin(prop, 1, false))
			joins = append(joins, p.getHashJoin(prop, 1, true))
		}
	case RightOuterJoin:
// 右连接,左表是内表,右表是外表,
// 允许 build 内表,probe 外表
// 如果外表比较小,允许 build 外表,probe 内表
		if ForceUseOuterBuild4Test {
			joins = append(joins, p.getHashJoin(prop, 0, true))
		} else {
			joins = append(joins, p.getHashJoin(prop, 0, false))
			joins = append(joins, p.getHashJoin(prop, 0, true))
		}
	case InnerJoin:
// 内连接,先拿右表当内表搞一个 plan ,再拿左表当内表搞一个 plan
		if ForcedHashLeftJoin4Test {
			joins = append(joins, p.getHashJoin(prop, 1, false))
		} else {
			joins = append(joins, p.getHashJoin(prop, 1, false))
			joins = append(joins, p.getHashJoin(prop, 0, false))
		}
	}
	return joins
}

func (p *LogicalJoin) getHashJoin(prop *property.PhysicalProperty, innerIdx int, useOuterToBuild bool) *PhysicalHashJoin {
	chReqProps := make([]*property.PhysicalProperty, 2)
	chReqProps[innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}
	chReqProps[1-innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}
// ???
	if prop.ExpectedCnt < p.stats.RowCount {
		expCntScale := prop.ExpectedCnt / p.stats.RowCount
		chReqProps[1-innerIdx].ExpectedCnt = p.children[1-innerIdx].statsInfo().RowCount * expCntScale
	}
	hashJoin := NewPhysicalHashJoin(p, innerIdx, useOuterToBuild, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), chReqProps...)
	hashJoin.SetSchema(p.schema)
	return hashJoin
}

// NewPhysicalHashJoin creates a new PhysicalHashJoin from LogicalJoin.
func NewPhysicalHashJoin(p *LogicalJoin, innerIdx int, useOuterToBuild bool, newStats *property.StatsInfo, prop ...*property.PhysicalProperty) *PhysicalHashJoin {
	leftJoinKeys, rightJoinKeys, isNullEQ, _ := p.GetJoinKeys()
	baseJoin := basePhysicalJoin{
		LeftConditions:  p.LeftConditions,
		RightConditions: p.RightConditions,
		OtherConditions: p.OtherConditions,
		LeftJoinKeys:    leftJoinKeys,
		RightJoinKeys:   rightJoinKeys,
		IsNullEQ:        isNullEQ,
		JoinType:        p.JoinType,
		DefaultValues:   p.DefaultValues,
		InnerChildIdx:   innerIdx,
	}
	hashJoin := PhysicalHashJoin{
		basePhysicalJoin: baseJoin,
		EqualConditions:  p.EqualConditions,
		Concurrency:      uint(p.ctx.GetSessionVars().HashJoinConcurrency()),
		UseOuterToBuild:  useOuterToBuild,
	}.Init(p.ctx, newStats, p.blockOffset, prop...)
	return hashJoin
}
// tryToGetIndexJoin will get index join by hints. If we can generate a valid index join by hint, the second return value
// will be true, which means we force to choose this index join. Otherwise we will select a join algorithm with min-cost.
func (p *LogicalJoin) tryToGetIndexJoin(prop *property.PhysicalProperty) (indexJoins []PhysicalPlan, canForced bool) {
// 提示优化器对指定表使用 Index Nested Loop Join 算法
// https://docs.pingcap.com/zh/tidb/dev/optimizer-hints#inl_joint1_name--tl_name-
	inljRightOuter := (p.preferJoinType & preferLeftAsINLJInner) > 0
	inljLeftOuter := (p.preferJoinType & preferRightAsINLJInner) > 0
	hasINLJHint := inljLeftOuter || inljRightOuter

// 提示优化器使用 Index Nested Loop Hash Join 算法
// https://docs.pingcap.com/zh/tidb/dev/optimizer-hints#inl_hash_join
	inlhjRightOuter := (p.preferJoinType & preferLeftAsINLHJInner) > 0
	inlhjLeftOuter := (p.preferJoinType & preferRightAsINLHJInner) > 0
	hasINLHJHint := inlhjLeftOuter || inlhjRightOuter

// 提示优化器使用 Index Nested Loop Merge Join 算法
	inlmjRightOuter := (p.preferJoinType & preferLeftAsINLMJInner) > 0
	inlmjLeftOuter := (p.preferJoinType & preferRightAsINLMJInner) > 0
	hasINLMJHint := inlmjLeftOuter || inlmjRightOuter

	forceLeftOuter := inljLeftOuter || inlhjLeftOuter || inlmjLeftOuter
	forceRightOuter := inljRightOuter || inlhjRightOuter || inlmjRightOuter
	needForced := forceLeftOuter || forceRightOuter

	defer func() {
		// refine error message
		// If the required property is not empty, we will enforce it and try the hint again.
		// So we only need to generate warning message when the property is empty.
		if !canForced && needForced && prop.IsEmpty() {
			// Construct warning message prefix.
			var errMsg string
			switch {
			case hasINLJHint:
				errMsg = "Optimizer Hint INL_JOIN or TIDB_INLJ is inapplicable"
			case hasINLHJHint:
				errMsg = "Optimizer Hint INL_HASH_JOIN is inapplicable"
			case hasINLMJHint:
				errMsg = "Optimizer Hint INL_MERGE_JOIN is inapplicable"
			}
			if p.hintInfo != nil {
				t := p.hintInfo.indexNestedLoopJoinTables
				switch {
				case len(t.inljTables) != 0:
					errMsg = fmt.Sprintf("Optimizer Hint %s or %s is inapplicable",
						restore2JoinHint(HintINLJ, t.inljTables), restore2JoinHint(TiDBIndexNestedLoopJoin, t.inljTables))
				case len(t.inlhjTables) != 0:
					errMsg = fmt.Sprintf("Optimizer Hint %s is inapplicable", restore2JoinHint(HintINLHJ, t.inlhjTables))
				case len(t.inlmjTables) != 0:
					errMsg = fmt.Sprintf("Optimizer Hint %s is inapplicable", restore2JoinHint(HintINLMJ, t.inlmjTables))
				}
			}

			// Append inapplicable reason.
			if len(p.EqualConditions) == 0 {
				errMsg += " without column equal ON condition"
			}

			// Generate warning message to client.
			warning := ErrInternal.GenWithStack(errMsg)
			p.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
		}
	}()

	// supportLeftOuter and supportRightOuter indicates whether this type of join
	// supports the left side or right side to be the outer side.
	var supportLeftOuter, supportRightOuter bool
	switch p.JoinType {
	case SemiJoin, AntiSemiJoin, LeftOuterSemiJoin, AntiLeftOuterSemiJoin, LeftOuterJoin:
// 如果是半连接或左连接,那么可以允许左表当外表
		supportLeftOuter = true
	case RightOuterJoin:
// 如果是右连接,那么可以允许右表当外表
		supportRightOuter = true
	case InnerJoin:
// 如果是内连接,那么可以允许右表或左表当外表
		supportLeftOuter, supportRightOuter = true, true
	}

	var allLeftOuterJoins, allRightOuterJoins, forcedLeftOuterJoins, forcedRightOuterJoins []PhysicalPlan
	if supportLeftOuter {
// 可以让左表当外表,获取所有以左表作为外表的
// PhysicalIndexJoin 或 PhysicalIndexHashJoin 或 PhysicalIndexMergeJoin
		allLeftOuterJoins = p.getIndexJoinByOuterIdx(prop, 0)
		forcedLeftOuterJoins = make([]PhysicalPlan, 0, len(allLeftOuterJoins))
		for _, j := range allLeftOuterJoins {
			switch j.(type) {
			case *PhysicalIndexJoin:
				if inljLeftOuter {
					forcedLeftOuterJoins = append(forcedLeftOuterJoins, j)
				}
			case *PhysicalIndexHashJoin:
				if inlhjLeftOuter {
					forcedLeftOuterJoins = append(forcedLeftOuterJoins, j)
				}
			case *PhysicalIndexMergeJoin:
				if inlmjLeftOuter {
					forcedLeftOuterJoins = append(forcedLeftOuterJoins, j)
				}
			}
		}
		switch {
		case len(forcedLeftOuterJoins) == 0 && !supportRightOuter:
			return filterIndexJoinBySessionVars(p.ctx, allLeftOuterJoins), false
		case len(forcedLeftOuterJoins) != 0 && (!supportRightOuter || (forceLeftOuter && !forceRightOuter)):
			return forcedLeftOuterJoins, true
		}
	}
	if supportRightOuter {
		allRightOuterJoins = p.getIndexJoinByOuterIdx(prop, 1)
		forcedRightOuterJoins = make([]PhysicalPlan, 0, len(allRightOuterJoins))
		for _, j := range allRightOuterJoins {
			switch j.(type) {
			case *PhysicalIndexJoin:
				if inljRightOuter {
					forcedRightOuterJoins = append(forcedRightOuterJoins, j)
				}
			case *PhysicalIndexHashJoin:
				if inlhjRightOuter {
					forcedRightOuterJoins = append(forcedRightOuterJoins, j)
				}
			case *PhysicalIndexMergeJoin:
				if inlmjRightOuter {
					forcedRightOuterJoins = append(forcedRightOuterJoins, j)
				}
			}
		}
		switch {
		case len(forcedRightOuterJoins) == 0 && !supportLeftOuter:
			return filterIndexJoinBySessionVars(p.ctx, allRightOuterJoins), false
		case len(forcedRightOuterJoins) != 0 && (!supportLeftOuter || (forceRightOuter && !forceLeftOuter)):
			return forcedRightOuterJoins, true
		}
	}

	canForceLeft := len(forcedLeftOuterJoins) != 0 && forceLeftOuter
	canForceRight := len(forcedRightOuterJoins) != 0 && forceRightOuter
	canForced = canForceLeft || canForceRight
	if canForced {
		return append(forcedLeftOuterJoins, forcedRightOuterJoins...), true
	}
	return filterIndexJoinBySessionVars(p.ctx, append(allLeftOuterJoins, allRightOuterJoins...)), false
}

// getIndexJoinByOuterIdx will generate index join by outerIndex. OuterIdx points out the outer child.
// First of all, we'll check whether the inner child is DataSource.
// Then, we will extract the join keys of p's equal conditions. Then check whether all of them are just the primary key
// or match some part of on index. If so we will choose the best one and construct a index join.
func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, outerIdx int) (joins []PhysicalPlan) {
	outerChild, innerChild := p.children[outerIdx], p.children[1-outerIdx]
	all, _ := prop.AllSameOrder()
	// If the order by columns are not all from outer child, index join cannot promise the order.
// INJ 的输出顺序就是按 outerChild 有序输,
// 如果 prop 中的 cols 有的不包含在 outerChild.Schema(),或 prop 中的列不是全部有序,
// 那么 INJ 将无法保证 prop
	if !prop.AllColsFromSchema(outerChild.Schema()) || !all {
		return nil
	}
	var (
		innerJoinKeys []*expression.Column
		outerJoinKeys []*expression.Column
	)
	if outerIdx == 0 {
		outerJoinKeys, innerJoinKeys, _, _ = p.GetJoinKeys()
	} else {
		innerJoinKeys, outerJoinKeys, _, _ = p.GetJoinKeys()
	}
// innerChild 需要直接是一个 DataSource 或者 UnionScan
	ds, isDataSource := innerChild.(*DataSource)
	us, isUnionScan := innerChild.(*LogicalUnionScan)
	if (!isDataSource && !isUnionScan) || (isDataSource && ds.preferStoreType&preferTiFlash != 0) {
		return nil
	}
	if isUnionScan {
// 如果 innerChild 是 UnionScan,那么取它的 child 作为 ds
		// The child of union scan may be union all for partition table.
		ds, isDataSource = us.Children()[0].(*DataSource)
		if !isDataSource {
			return nil
		}
		// If one of the union scan children is a TiFlash table, then we can't choose index join.
		for _, child := range us.Children() {
			if ds, ok := child.(*DataSource); ok && ds.preferStoreType&preferTiFlash != 0 {
				return nil
			}
		}
	}
// ???
	var avgInnerRowCnt float64
	if outerChild.statsInfo().RowCount > 0 {
		avgInnerRowCnt = p.equalCondOutCnt / outerChild.statsInfo().RowCount
	}
	joins = p.buildIndexJoinInner2TableScan(prop, ds, innerJoinKeys, outerJoinKeys, outerIdx, us, avgInnerRowCnt)
	if joins != nil {
		return
	}
	return p.buildIndexJoinInner2IndexScan(prop, ds, innerJoinKeys, outerJoinKeys, outerIdx, us, avgInnerRowCnt)
}

// buildIndexJoinInner2TableScan builds a TableScan as the inner child for an
// IndexJoin if possible.
// If the inner side of a index join is a TableScan, only one tuple will be
// fetched from the inner side for every tuple from the outer side. This will be
// promised to be no worse than building IndexScan as the inner child.
func (p *LogicalJoin) buildIndexJoinInner2TableScan(
	prop *property.PhysicalProperty, ds *DataSource, innerJoinKeys, outerJoinKeys []*expression.Column,
	outerIdx int, us *LogicalUnionScan, avgInnerRowCnt float64) (joins []PhysicalPlan) {
	var tblPath *util.AccessPath
// 要把 innner child build 成 TableScan,所以要先找出 TablePath
// IsTablePath returns true if it's IntHandlePath or CommonHandlePath.
	for _, path := range ds.possibleAccessPaths {
		if path.IsTablePath() && path.StoreType == kv.TiKV {
			tblPath = path
			break
		}
	}
	if tblPath == nil {
		return nil
	}
	keyOff2IdxOff := make([]int, len(innerJoinKeys))
	newOuterJoinKeys := make([]*expression.Column, 0)
	var ranges []*ranger.Range
	var innerTask, innerTask2 task
	var helper *indexJoinBuildHelper
	if ds.tableInfo.IsCommonHandle {
		helper, keyOff2IdxOff = p.getIndexJoinBuildHelper(ds, innerJoinKeys, func(path *util.AccessPath) bool { return path.IsCommonHandlePath }, outerJoinKeys)
		if helper == nil {
			return nil
		}
		innerTask = p.constructInnerTableScanTask(ds, nil, outerJoinKeys, us, false, false, avgInnerRowCnt)
		// The index merge join's inner plan is different from index join, so we
		// should construct another inner plan for it.
		// Because we can't keep order for union scan, if there is a union scan in inner task,
		// we can't construct index merge join.
		if us == nil {
			innerTask2 = p.constructInnerTableScanTask(ds, nil, outerJoinKeys, us, true, !prop.IsEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
		}
		ranges = helper.chosenRanges
	} else {
// InnerTable 主键就是一个 int 列
		pkMatched := false
		pkCol := ds.getPKIsHandleCol()
		if pkCol == nil {
			return nil
		}
		for i, key := range innerJoinKeys {
// 遍历 innerJoinKeys,找到是主键的那一个 key
			if !key.Equal(nil, pkCol) {
				keyOff2IdxOff[i] = -1
				continue
			}
			pkMatched = true
			keyOff2IdxOff[i] = 0
// ???
			// Add to newOuterJoinKeys only if conditions contain inner primary key. For issue #14822.
			newOuterJoinKeys = append(newOuterJoinKeys, outerJoinKeys[i])
		}
// ???
		outerJoinKeys = newOuterJoinKeys
		
		if !pkMatched {
			return nil
		}
// innerTask 是一个不需要 KeepOrder,的 innerTable 的 TableScan
		innerTask = p.constructInnerTableScanTask(ds, pkCol, outerJoinKeys, us, false, false, avgInnerRowCnt)
		// The index merge join's inner plan is different from index join, so we
		// should construct another inner plan for it.
		// Because we can't keep order for union scan, if there is a union scan in inner task,
		// we can't construct index merge join.
		if us == nil {
// 为了满足 index merge join,
// innerTask2 是一个需要 KeepOrder,的 innerTable 的 TableScan
// ??? 但是如果 innerTable 主键就是一个 int 列,而 innerJoinKeys 有多列,
// 就只能保证主键有序,保证不了 innerJoinKeys 有序啊
			innerTask2 = p.constructInnerTableScanTask(ds, pkCol, outerJoinKeys, us, true, !prop.IsEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
		}
	}
	var (
		path       *util.AccessPath
		lastColMng *ColWithCmpFuncManager
	)
	if helper != nil {
		path = helper.chosenPath
		lastColMng = helper.lastColManager
	}
	joins = make([]PhysicalPlan, 0, 3)
	failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
		if val.(bool) {
			failpoint.Return(p.constructIndexHashJoin(prop, outerIdx, innerTask, nil, keyOff2IdxOff, path, lastColMng))
		}
	})
	joins = append(joins, p.constructIndexJoin(prop, outerIdx, innerTask, ranges, keyOff2IdxOff, path, lastColMng, true)...)
	// We can reuse the `innerTask` here since index nested loop hash join
	// do not need the inner child to promise the order.
	joins = append(joins, p.constructIndexHashJoin(prop, outerIdx, innerTask, ranges, keyOff2IdxOff, path, lastColMng)...)
	if innerTask2 != nil {
		joins = append(joins, p.constructIndexMergeJoin(prop, outerIdx, innerTask2, ranges, keyOff2IdxOff, path, lastColMng)...)
	}
	return joins
}

遍历 physicalPlans,为每一个 pp(当作头节点) 制作一个完整的执行计划(遍历 p.children,为每一个 child 调用 child.findBestTask(pp.GetChildReqProps(j), …),找到 child 的 best task 作为 pp 的 child),最后选出一个最优的,完整的执行计划。并返回 physicalPlans 中每个 pp 所可以产生的完整的执行计划数量总和

func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPlan, prop *property.PhysicalProperty, addEnforcer bool, planCounter *PlanCounterTp) (task, int64, error) {
	var bestTask task = invalidTask
// curCntPlan 是对于 physicalPlans 中每一个 pp 所可以产生的 plan 数量
// cntPlan 是 physicalPlans 中所有 pp 所可以产生的 plan 数量总和
	var curCntPlan, cntPlan int64
// childTasks 是对于 physicalPlans 中每一个 pp 的每个 child 各自的 bestTask
	childTasks := make([]task, 0, len(p.children))
// childCnts 是对于 physicalPlans 中每一个 pp 的每个 child 各自可以找到多少种 task
	childCnts := make([]int64, len(p.children))
	cntPlan = 0
	for _, pp := range physicalPlans {
		// Find best child tasks firstly.
		childTasks = childTasks[:0]
		// The curCntPlan records the number of possible plans for pp
		curCntPlan = 1
// ???
		TimeStampNow := p.GetLogicalTS4TaskMap()
		savedPlanID := p.ctx.GetSessionVars().PlanID
		for j, child := range p.children {
// GetChildReqProps gets the required property by child index.
// 因为 planCounter 只用在最顶层 node 计数,
// 所以孩子 findBestTask 的时候传 PlanCounterDisabled(-1)
			childTask, cnt, err := child.findBestTask(pp.GetChildReqProps(j), &PlanCounterDisabled)
			childCnts[j] = cnt
			if err != nil {
				return nil, 0, err
			}
// pp 的每个 child 可以找到 cnt 个 plan,那么相当于以 pp 为根,可以找到 cnt * cnt ... 个 plan 
			curCntPlan = curCntPlan * cnt
			if childTask != nil && childTask.invalid() {
				break
			}
			childTasks = append(childTasks, childTask)
		}

		// This check makes sure that there is no invalid child task.
		if len(childTasks) != len(p.children) {
			continue
		}

// 如果启用了 planCounter,而且要找的第 planCounter 个 plan 就在这个 pp 中出现
		// If the target plan can be found in this physicalPlan(pp), rebuild childTasks to build the corresponding combination.
		if planCounter.IsForce() && int64(*planCounter) <= curCntPlan {
			p.ctx.GetSessionVars().PlanID = savedPlanID
			curCntPlan = int64(*planCounter)
// ???
			err := p.rebuildChildTasks(&childTasks, pp, childCnts, int64(*planCounter), TimeStampNow)
			if err != nil {
				return nil, 0, err
			}
		}


		// Combine best child tasks with parent physical plan.
		// attach2Task makes the current physical plan as the father of task's physicalPlan and updates the cost of
		// current task. If the child's task is cop task, some operator may close this task and return a new rootTask.
		curTask := pp.attach2Task(childTasks...)

		if curTask.invalid() {
			continue
		}

// ???
		// An optimal task could not satisfy the property, so it should be converted here.
		if _, ok := curTask.(*rootTask); !ok && prop.TaskTp == property.RootTaskType {
			curTask = curTask.convertToRootTask(p.ctx)
		}

// 在最顶上加一个 PhysicalSort
		// Enforce curTask property
		if addEnforcer {
			curTask = enforceProperty(prop, curTask, p.basePlan.ctx)
		}

// ???
		// Optimize by shuffle executor to running in parallel manner.
		if prop.IsEmpty() {
			// Currently, we do not regard shuffled plan as a new plan.
			curTask = optimizeByShuffle(curTask, p.basePlan.ctx)
		}

		cntPlan += curCntPlan
		planCounter.Dec(curCntPlan)

		if planCounter.Empty() {
			bestTask = curTask
			break
		}
		// Get the most efficient one.
		if curTask.cost() < bestTask.cost() || (bestTask.invalid() && !curTask.invalid()) {
			bestTask = curTask
		}
	}
	return bestTask, cntPlan, nil
}

将 PhysicalPlan 变成 task,并且把 childs 的 tasks 接在自己身上

type PhysicalPlan interface {
	Plan

	// attach2Task makes the current physical plan as the father of task's physicalPlan and updates the cost of
	// current task. If the child's task is cop task, some operator may close this task and return a new rootTask.
	attach2Task(...task) task
}

https://img-blog.csdnimg.cn/765f51990b2644349f3faa38fc66eeaa.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center