kube-batch ——pod 和task

pkg/scheduler/scheduler.go:67

相应的资源 Informer 开始 Iist-Watch 监听事件变化

// Run  starts the schedulerCache
func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
    go sc.pdbInformer.Informer().Run(stopCh)
    go sc.podInformer.Informer().Run(stopCh)
    go sc.nodeInformer.Informer().Run(stopCh)
    go sc.podGroupInformerv1alpha1.Informer().Run(stopCh)
    go sc.podGroupInformerv1alpha2.Informer().Run(stopCh)
    go sc.pvInformer.Informer().Run(stopCh)
    go sc.pvcInformer.Informer().Run(stopCh)
    go sc.scInformer.Informer().Run(stopCh)
    go sc.queueInformerv1alpha1.Informer().Run(stopCh)
    go sc.queueInformerv1alpha2.Informer().Run(stopCh)

    if options.ServerOpts.EnablePriorityClass {
        go sc.pcInformer.Informer().Run(stopCh)
    }

    // Re-sync error tasks.
    go wait.Until(sc.processResyncTask, 0, stopCh)

    // Cleanup jobs.
    go wait.Until(sc.processCleanupJob, 0, stopCh)
}

sc.podInformer

        // create informer for pod information
        sc.podInformer = informerFactory.Core().V1().Pods()
        sc.podInformer.Informer().AddEventHandler(
                cache.FilteringResourceEventHandler{
                        FilterFunc: func(obj interface{}) bool {
                                switch obj.(type) {
                                case *v1.Pod:
                                        pod := obj.(*v1.Pod)
                                        if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
                                                return true
                                        }
                                        return pod.Status.Phase != v1.PodPending
                                default:
                                        return false
                                }
                        },
                        Handler: cache.ResourceEventHandlerFuncs{
                                AddFunc:    sc.AddPod,
                                UpdateFunc: sc.UpdatePod,
                                DeleteFunc: sc.DeletePod,
                        },
                })

这里可以看到,kube-batch只关心需要自己调度,并且Pending的Pod;以及Running的Pod。

kube-batchpkgschedulercacheevent_handlers.go

func (sc *SchedulerCache) AddPod(obj interface{}) {
    sc.Mutex.Lock()
    defer sc.Mutex.Unlock()

    err := sc.addPod(pod)
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
    pi := arbapi.NewTaskInfo(pod)

    return sc.addTask(pi)
}

全局一把锁,以后会是性能瓶颈。这里我们看到kube-batch会将Pod转换成TaskInfo缓存起来。

kube-batchpkgschedulerapijob_info.go

func NewTaskInfo(pod *v1.Pod) *TaskInfo {
   req := EmptyResource()

   // TODO(k82cn): also includes initContainers' resource.
   for _, c := range pod.Spec.Containers {
      req.Add(NewResource(c.Resources.Requests))
   }

   ti := &TaskInfo{
      UID:       TaskID(pod.UID),
      Job:       getJobID(pod),
      Name:      pod.Name,
      Namespace: pod.Namespace,
      NodeName:  pod.Spec.NodeName,
      Status:    getTaskStatus(pod),
      Priority:  1,

      Pod:    pod,
      Resreq: req,
   }

   if pod.Spec.Priority != nil {
      ti.Priority = *pod.Spec.Priority
   }

   return ti
}

转换过程比较简单,注意两点:

  • 需要统计资源请求量
  • JobID通过pod.Annotations[arbcorev1.GroupNameAnnotationKey]或者所属的controller

kube-batchpkgschedulercacheevent_handlers.go

func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error {
   if len(pi.Job) != 0 {
      if _, found := sc.Jobs[pi.Job]; !found {
         sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job)
      }

      sc.Jobs[pi.Job].AddTaskInfo(pi)
   }
}

kube-batchpkgschedulerapijob_info.go

func NewJobInfo(uid JobID) *JobInfo {
   return &JobInfo{
      UID: uid,

      MinAvailable: 0,
      NodeSelector: make(map[string]string),

      Allocated:    EmptyResource(),
      TotalRequest: EmptyResource(),

      TaskStatusIndex: map[TaskStatus]tasksMap{},
      Tasks:           tasksMap{},
   }
}

func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
    ji.Tasks[ti.UID] = ti
    ji.addTaskIndex(ti)

    ji.TotalRequest.Add(ti.Resreq)
}

func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
    if _, found := ji.TaskStatusIndex[ti.Status]; !found {
        ji.TaskStatusIndex[ti.Status] = tasksMap{}
    }

    ji.TaskStatusIndex[ti.Status][ti.UID] = ti
}

最终task会归于一个job,job主要保存tasks,资源请求总量等信息。

kbinformer.Scheduling().V1alpha2().PodGroups

        // create informer for PodGroup(v1alpha2) information
        sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups()
        sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    sc.AddPodGroupAlpha2,
                UpdateFunc: sc.UpdatePodGroupAlpha2,
                DeleteFunc: sc.DeletePodGroupAlpha2,
        })
原文地址:https://www.cnblogs.com/dream397/p/14985317.html