Kubernetes源码阅读笔记——Scheduler(之二)

上一篇文章主要侧重于Scheduler在正式执行调度任务之前的准备工作。下面,我们将分析Scheduler的核心方法之一:scheduleOne,来学习Scheduler的具体执行方式。

一、scheduleOne

pkg/scheduler/scheduler.go

func (sched *Scheduler) scheduleOne() {
    ...
    pod := sched.config.NextPod()
    // pod could be nil when schedulerQueue is closed
    if pod == nil {
        return
    }
    if pod.DeletionTimestamp != nil {
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        return
    }

    klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    scheduleResult, err := sched.schedule(pod)
    if err != nil {
        if fitError, ok := err.(*core.FitError); ok {
            if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
                klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
                    " No preemption is performed.")
            } else {
                preemptionStartTime := time.Now()
                sched.preempt(pod, fitError)
                metrics.PreemptionAttempts.Inc()
                metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(...)
                metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(...)
            }
            metrics.PodScheduleFailures.Inc()
        } else {
            klog.Errorf("error selecting node for pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        }
        return
    }
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
    assumedPod := pod.DeepCopy()
    allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        klog.Errorf("error assuming volumes: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }

    ...

    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        klog.Errorf("error assuming pod: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }
    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    go func() {
        // Bind volumes first before Pod
        if !allBound {
            err := sched.bindVolumes(assumedPod)
            if err != nil {
                klog.Errorf("error binding volumes: %v", err)
                metrics.PodScheduleErrors.Inc()
                return
            }
        }

        ...

        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: scheduleResult.SuggestedHost,
            },
        })
        ...
    }()
}

执行过程分为以下几步:

(1)从队列中取出下一个Pod。

这一步通过调用sched.config.NextPod方法实现,从podQueue队列中取出下一个Pod。如果这个Pod正在删除,则跳过。

(2)调度这个Pod。

这一步通过调用sched.schedule方法实现。schedule方法比较简单,就是调用sche.config.Algorithm.Schedule方法,如果出现错误的话进行记录。

还有一点需要注意:当schedule方法不成功,即pod不适合任何一个主机时,会触发抢占调度逻辑,即调用preempt方法(前提是scheduler的preemption功能开启了)。

Schedule方法位于pkg/scheduler/core/generic_scheduler.go中,我们来详细看一下:

pkg/scheduler/core/generic_scheduler.go

func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
    trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
        return result, err
    }

    nodes, err := nodeLister.List()
    if err != nil {
        return result, err
    }
    if len(nodes) == 0 {
        return result, ErrNoNodesAvailable
    }

    if err := g.snapshot(); err != nil {
        return result, err
    }

    trace.Step("Computing predicates")
    startPredicateEvalTime := time.Now()
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    if err != nil {
        return result, err
    }

    if len(filteredNodes) == 0 {
        return result, &FitError{
            Pod:              pod,
            NumAllNodes:      len(nodes),
            FailedPredicates: failedPredicateMap,
        }
    }
    ...

    trace.Step("Prioritizing")
    startPriorityEvalTime := time.Now()
    // When only one node after predicate, just use it.
    if len(filteredNodes) == 1 {
        metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
        return ScheduleResult{
            SuggestedHost:  filteredNodes[0].Name,
            EvaluatedNodes: 1 + len(failedPredicateMap),
            FeasibleNodes:  1,
        }, nil
    }

    metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    if err != nil {
        return result, err
    }
    ...

    trace.Step("Selecting host")

    host, err := g.selectHost(priorityList)
    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
        FeasibleNodes:  len(filteredNodes),
    }, err
}

这里用到了一个trace工具,用于跟进调度的过程。

调度的流程是:首先获取Node列表,通过调用findNodesThatFit方法过滤掉不符合要求的Node,然后通过调用PrioritizeNodes方法对Node进行优选打分,最后调用selectHost进行选择,并返回调度结果。

关于findNodesThatFit、PrioritizeNodes和selectHost方法的具体实现,后面详细分析。

(3)预绑定Pod到节点上。

这一步通过调用sched.assumeVolumes方法预绑定Pod和Volume,然后运行相关插件,最后调用sched.assume方法预绑定Pod和主机。预绑定过程不涉及与API Server的交互,仅在scheduler的缓存中更新pod和volume的状态。

(4)执行绑定。

与前面的预绑定过程类似,正式的绑定也是先调用bingVolumes方法绑定Pod和Volume,然后运行插件,最后调用bind方法,实现正式绑定。正式绑定会与API Server交互,向API Server提交pod状态的更新。

二、findNodesThatFit

findNodesThatFit的作用是选出所有可运行Pod的节点。

pkg/scheduler/core/generic_scheduler.go

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
    var filtered []*v1.Node
    failedPredicateMap := FailedPredicateMap{}

    if len(g.predicates) == 0 {
        filtered = nodes
    } else {
          allNodes := int32(g.cache.NodeTree().NumNodes())
          numNodesToFind := g.numFeasibleNodesToFind(allNodes)
        ...

        checkNode := func(i int) {
            nodeName := g.cache.NodeTree().Next()
            fits, failedPredicates, err := podFitsOnNode(
                pod,
                meta,
                g.cachedNodeInfoMap[nodeName],
                g.predicates,
                g.schedulingQueue,
                g.alwaysCheckAllPredicates,
            )
            if err != nil {
                predicateResultLock.Lock()
                errs[err.Error()]++
                predicateResultLock.Unlock()
                return
            }
            if fits {
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
                }
            } else {
                predicateResultLock.Lock()
                failedPredicateMap[nodeName] = failedPredicates
                predicateResultLock.Unlock()
            }
        }

        // Stops searching for more nodes once the configured number of feasible nodes
        // are found.
        workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

        filtered = filtered[:filteredLen]
        if len(errs) > 0 {
            return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
        }
    }

    if len(filtered) > 0 && len(g.extenders) != 0 {
        ...
    }
    return filtered, failedPredicateMap, nil
}

这个方法首先获取全部节点的数量,并基于这个数量调用numFeasibleNodesToFind方法,取得寻找合适节点的最大值。超过这个最大值后,调度器就不再继续寻找可调度的节点了。

其次,方法定义了一个checkNode函数,采用多线程方式运行它(通过调用ParallelizeUntil方法,最多16个线程)。这个checkNode函数会调用podFitsOnNode函数,检测Pod可不可以调度到Node上,并将可调度的Node列表存储到filtered数组中。后面extenders是用来处理外部影响调度的情况的,略去。

podFitsOnNode方法检测指定的pod能不能被调度到指定的node上,原理是依次用pkg/scheduler/algorithm/predicates/predicates.go里的predicatesOrdering列表中的元素对node进行预选检测。值得注意的是,podFitsOnNode运行了一个执行两次的循环,分别在pod调度与不调度的情况下进行两次预选检测。只有两次都通过检测后才算是预选成功。

pkg/scheduler/algorithm/predicates/predicates.go包中定义了所有的检测方法,包括volume是否冲突,节点资源是否充足等。只有通过所有检测的节点,才会加入filtered数组中。这些检测方法通过pkg/scheduler/algorithmprovider/defaults/register_predicates.go中的init方法进行注册。

三、PrioritizeNodes

找到所有可调度节点后,就进入下一阶段,即优选阶段,通过调用PrioritizeNodes方法实现。

pkg/scheduler/core/generic_scheduler.go

func PrioritizeNodes(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
    meta interface{},
    priorityConfigs []algorithm.PriorityConfig,
    nodes []*v1.Node,
    extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
    // If no priority configs are provided, then the EqualPriority function is applied
    // This is required to generate the priority list in the required format
    if len(priorityConfigs) == 0 && len(extenders) == 0 {
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
        for i := range nodes {
            hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
            if err != nil {
                return nil, err
            }
            result = append(result, hostPriority)
        }
        return result, nil
    }

    var (
        mu   = sync.Mutex{}
        wg   = sync.WaitGroup{}
        errs []error
    )
    appendError := func(err error) {
        mu.Lock()
        defer mu.Unlock()
        errs = append(errs, err)
    }

    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

        ...
        workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
                nodeInfo := nodeNameToInfo[nodes[index].Name]
                for i := range priorityConfigs {
                    if priorityConfigs[i].Function != nil {
                        continue
                   }

                      var err error
                      results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
                      if err != nil {
                          appendError(err)
                          results[i][index].Host = nodes[index].Name
                      }
                }
         })


    for i := range priorityConfigs {
        if priorityConfigs[i].Reduce == nil {
            continue
        }
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
                appendError(err)
            }
            if klog.V(10) {
                for _, hostPriority := range results[index] {
                    klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
                }
            }
        }(i)
    }
    // Wait for all computations to be finished.
    wg.Wait()
    if len(errs) != 0 {
        return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    }

    // Summarize all scores.
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))

    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }

    if len(extenders) != 0 && nodes != nil {
        ...
    }

    if klog.V(10) {
        for i := range result {
            klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
        }
    }
    return result, nil
}

第一步先判断有没有优先级的配置,如果没有则对所有节点赋予相同的权重,返回一个均等的node列表(即HostPriority的Score字段均设置为1)。

第二步通过Map-Reduce方法多线程计算每个Node的分数。我们看到,Map-Reduce操作会遍历一个priorityConfigs数组,这个数组存放的就是PriorityConfig结构体的集合:

pkg/scheduler/algorithm/types.go

// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
    Name   string
    Map    PriorityMapFunction
    Reduce PriorityReduceFunction
    // TODO: Remove it after migrating all functions to
    // Map-Reduce pattern.
    Function PriorityFunction
    Weight   int
}

而PriorityConfig正是存储优选算法相关信息的结构体。每个优选算法都会有自己的Name、Map(Map函数)、Reduce(Reduce函数)和Weight字段。所有的Map和Reduce函数都在pkg/scheduler/algorithm/priorities包中定义。这些方法通过pkg/scheduler/algorithmprovider/defaults/register_priorities.go中的init方法注册。

第三步汇总打的分数,将每个算法计算出的节点分数乘以算法的权重,存入result中并返回。

四、selectHost

selectHost方法很直观,就是从前面打过分的节点列表中选出分数最高的节点,将其host返回。

pkg/scheduler/core/generic_scheduler.go

// selectHost takes a prioritized list of nodes and then picks one
// in a round-robin manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
    if len(priorityList) == 0 {
        return "", fmt.Errorf("empty priorityList")
    }

    maxScores := findMaxScores(priorityList)
    ix := int(g.lastNodeIndex % uint64(len(maxScores)))
    g.lastNodeIndex++

    return priorityList[maxScores[ix]].Host, nil
}

五、总结

总体而言,Scheduler的具体调度过程大致包含取出下一个Pod、选出目标Node、预绑定、实际绑定四个步骤。Scheduler并不实际运行Pod,它所做的只是向API-Server提交Pod调度后更新的信息(在sched.bind中实现)。

原文地址:https://www.cnblogs.com/00986014w/p/10320844.html