Kubernetes源码阅读笔记——Scheduler(之一)

Scheduler是集群中Master节点的重要组件,其功能是根据集群中各Pod的资源需求、亲和性等指标,将Pod合理调度到Kubernetes集群中的各个节点上。

一、入口函数

入口函数与Controller Manager的入口函数结构相同,同样是应用了cobra包,在命令行中注册了kube-scheduler命令。

cmd/kube-scheduler/scheduler.go

func main() { rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand() pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc) logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v ", err) os.Exit(1) } }

这里核心的方法仍然是NewSchedulerCommand。该方法位于app/server.go中,结构与Controller Manager几乎一样,因此不贴上来了。核心的部分仍然是在cobra.Command结构体的Run字段中调用runCommand方法。

runCommand方法为Scheduler配置Config,最终返回的是Run方法,将Scheduler运行起来。

runCommand方法中间有一行值得注意:

func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {

    ...

    algorithmprovider.ApplyFeatureGates()

    ...
    return Run(cc, stopCh)
}

这一行的作用是调用ApplyFeatureGates方法,并根据Feature Gate的配置,注册或者删除相应的预选策略。

进入ApplyFeatureGates方法,发现方法就一行,而整个包就这一个方法:

pkg/scheduler/algorithmprovider/plugin.go

package algorithmprovider

import  "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"

// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
    defaults.ApplyFeatureGates()
}

事实上,在pkg/scheduler/algorirhmprovider/defaults/defaults.go中,有一个init方法:

pkg/scheduler/algorirhmprovider/defaults/defaults.go

func init() { registerAlgorithmProvider(defaultPredicates(), defaultPriorities()) }

因此,在导入defaults包时,就已经执行了registerAlgorithmProvider方法,对一些预选与优选方法进行了注册。再配合ApplyFeatureGates方法,根据k8s中一些feature的开启情况,增加或删除一些预选和优选方法。这些feature的位置在pkg/features/kube_features.go中。

详细的预选和优选方法的定义位于pkg/scheduler/algorithm和pkg/scheduler/algorithmprovider中,这里不详细展开。

二、Run

看一下Run方法:

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        cc.PodInformer,
        cc.InformerFactory.Core().V1().PersistentVolumes(),
        cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
        cc.InformerFactory.Core().V1().ReplicationControllers(),
        cc.InformerFactory.Apps().V1().ReplicaSets(),
        cc.InformerFactory.Apps().V1().StatefulSets(),
        cc.InformerFactory.Core().V1().Services(),
        cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
        cc.InformerFactory.Storage().V1().StorageClasses(),
        cc.Recorder,
        cc.ComponentConfig.AlgorithmSource,
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
    if err != nil {
        return err
    }

    // Prepare the event broadcaster.
    ...

    // Setup healthz checks.
    ...// Start all informers.
    go cc.PodInformer.Informer().Run(stopCh)
    cc.InformerFactory.Start(stopCh)

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(stopCh)
    controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    ...

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}

Run方法主要包含下面几件事:

(1)创建Scheduler。

Run方法的前几行代码调用了New方法,创建了一个Scheduler对象。这个New方法位于pkg/scheduler/scheduler.go中:

pkg/scheduler/scheduler.go

func New(client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer appsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    recorder record.EventRecorder,
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {

    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

    // Set up the configurator which can create schedulers from configs.
    configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
        SchedulerName:                  options.schedulerName,
        Client:                         client,
        NodeInformer:                   nodeInformer,
        PodInformer:                    podInformer,
        PvInformer:                     pvInformer,
        PvcInformer:                    pvcInformer,
        ReplicationControllerInformer:  replicationControllerInformer,
        ReplicaSetInformer:             replicaSetInformer,
        StatefulSetInformer:            statefulSetInformer,
        ServiceInformer:                serviceInformer,
        PdbInformer:                    pdbInformer,
        StorageClassInformer:           storageClassInformer,
        HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
        DisablePreemption:              options.disablePreemption,
        PercentageOfNodesToScore:       options.percentageOfNodesToScore,
        BindTimeoutSeconds:             options.bindTimeoutSeconds,
    })
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        ...
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        ...
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    config.Recorder = recorder
    config.DisablePreemption = options.disablePreemption
    config.StopEverything = stopCh
    // Create the scheduler.
    sched := NewFromConfig(config)
    return sched, nil
}

New方法逻辑相对清晰,其本质就是根据传入的Informer、算法等参数,实例化一个Config,然后调用NewFromConfig方法,通过这个Config创建一个scheduler实例并返回。可以看到,scheduler中也用到了包括nodeInformer、podInformer等在内的大量Informer,因为scheduler也需要及时掌握资源的变化,从而调整调度的策略。

中间switch一段代码会判断config的调度算法源是用户自定义的还是给定的provider。如果使用默认的provider,则会将前面注册过的预选、优选方法加载进来。

创建config的NewConfigFactory方法位于pkg/scheduler/factory/factory.go中,进入方法:

pkg/scheduler/factory/factory.go

// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
   stopEverything := args.StopCh
   if stopEverything == nil {
      stopEverything = wait.NeverStop
   }
   schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

   // storageClassInformer is only enabled through VolumeScheduling feature gate
   var storageClassLister storagelisters.StorageClassLister
   if args.StorageClassInformer != nil {
      storageClassLister = args.StorageClassInformer.Lister()
   }
   c := &configFactory{
      client:                         args.Client,
      podLister:                      schedulerCache,
      podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
      nodeLister:                     args.NodeInformer.Lister(),
      pVLister:                       args.PvInformer.Lister(),
      pVCLister:                      args.PvcInformer.Lister(),
      serviceLister:                  args.ServiceInformer.Lister(),
      controllerLister:               args.ReplicationControllerInformer.Lister(),
      replicaSetLister:               args.ReplicaSetInformer.Lister(),
      statefulSetLister:              args.StatefulSetInformer.Lister(),
      pdbLister:                      args.PdbInformer.Lister(),
      storageClassLister:             storageClassLister,
      schedulerCache:                 schedulerCache,
      StopEverything:                 stopEverything,
      schedulerName:                  args.SchedulerName,
      hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
      disablePreemption:              args.DisablePreemption,
      percentageOfNodesToScore:       args.PercentageOfNodesToScore,
   }

   c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
   // scheduled pod cache
   args.PodInformer.Informer().AddEventHandler(
      cache.FilteringResourceEventHandler{
         FilterFunc: func(obj interface{}) bool {
            switch t := obj.(type) {
            case *v1.Pod:
               return assignedPod(t)
            case cache.DeletedFinalStateUnknown:
               if pod, ok := t.Obj.(*v1.Pod); ok {
                  return assignedPod(pod)
               }
               runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
               return false
            default:
               runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
               return false
            }
         },
         Handler: cache.ResourceEventHandlerFuncs{
            AddFunc:    c.addPodToCache,
            UpdateFunc: c.updatePodInCache,
            DeleteFunc: c.deletePodFromCache,
         },
      },
   )
   // unscheduled pod queue
   args.PodInformer.Informer().AddEventHandler(
      cache.FilteringResourceEventHandler{
         FilterFunc: func(obj interface{}) bool {
            switch t := obj.(type) {
            case *v1.Pod:
               return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
            case cache.DeletedFinalStateUnknown:
               if pod, ok := t.Obj.(*v1.Pod); ok {
                  return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
               }
               runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
               return false
            default:
               runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
               return false
            }
         },
         Handler: cache.ResourceEventHandlerFuncs{
            AddFunc:    c.addPodToSchedulingQueue,
            UpdateFunc: c.updatePodInSchedulingQueue,
            DeleteFunc: c.deletePodFromSchedulingQueue,
         },
      },
   )
   // ScheduledPodLister is something we provide to plug-in functions that
   // they may need to call.
   c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}

   args.NodeInformer.Informer().AddEventHandler(
      cache.ResourceEventHandlerFuncs{
         AddFunc:    c.addNodeToCache,
         UpdateFunc: c.updateNodeInCache,
         DeleteFunc: c.deleteNodeFromCache,
      },
   )

   args.PvInformer.Informer().AddEventHandler(
      cache.ResourceEventHandlerFuncs{
         // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
         AddFunc:    c.onPvAdd,
         UpdateFunc: c.onPvUpdate,
      },
   )

   // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
   args.PvcInformer.Informer().AddEventHandler(
      cache.ResourceEventHandlerFuncs{
         AddFunc:    c.onPvcAdd,
         UpdateFunc: c.onPvcUpdate,
      },
   )

   // This is for ServiceAffinity: affected by the selector of the service is updated.
   args.ServiceInformer.Informer().AddEventHandler(
      cache.ResourceEventHandlerFuncs{
         AddFunc:    c.onServiceAdd,
         UpdateFunc: c.onServiceUpdate,
         DeleteFunc: c.onServiceDelete,
      },
   )

   // Setup volume binder
   c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)

   args.StorageClassInformer.Informer().AddEventHandler(
      cache.ResourceEventHandlerFuncs{
         AddFunc: c.onStorageClassAdd,
      },
   )

   // Setup cache debugger
   ...

   go func() {
      <-c.StopEverything
      c.podQueue.Close()
   }()

   return c
}

该方法为一系列Informer初始化了回调函数。其中最重要的是PodInformer的两个回调函数。

可以看到,方法调用了两次AddEventHandler方法,都经过了过滤。第一次只处理已调度的Pod,第二次只处理未调度的Pod,并定义了对两种Pod的增、改、删方法,分别在缓存和队列中对这两种Pod进行更新。这样,就将已调度和未调度的Pod区分开。

后面为其他informer添加的回调函数,除了NodeInformer的回调函数会在缓存中更新node信息,其他回调函数最终都会调用MoveAllToActiveQueue方法,将待调度的Pod添加进队列。

此外,可以看到,在ConfigFactory中,有一个podQueue字段,维护了一个队列,用于存放待调度的Pod。

(2)运行广播和健康检查。

中间有几行是为Scheduler配置广播和健康检查相关内容,与Controller Manager类似,不提。

(3)Informer启动。

值得注意的是,Scheduler将PodInformer从其他的Informer中独立出来,因为对Pod的调度才是Scheduler的核心。

(4)运行Scheduler。

这是整个方法的核心。通过调用Scheduler的Run方法,将Scheduler运行起来。

进入Run方法,我们发现方法非常简洁,就做了2件事:

pkg/scheduler/scheduler.go

func (sched *Scheduler) Run() {
	if !sched.config.WaitForCacheSync() {
		return
	}

	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

第一件事是等待各informer的缓存同步,第二件事是调用scheduleOne方法,执行Pod的调度操作。wait.Until的作用是每隔一段时间执行一次sched.scheduleOne方法,除非sched.config.StopEverything被关闭。这里时间段被设置为0,所以scheduleOne方法会一个接一个不停地被调用。

scheduleOne方法的具体逻辑我们下一篇文章再继续分析。https://www.cnblogs.com/00986014w/p/10320844.html

三、总结

总结Scheduler的逻辑,大体上是通过cobra注册一个kube-scheduler命令并运行。命令运行时,首先应用给定的调度算法,然后基于ConfigFactory,创建一个Scheduler的实例,启动相关的Informer,然后开始执行调度。

原文地址:https://www.cnblogs.com/00986014w/p/10305425.html