pkg/scheduler/scheduler.go:67
相应的资源 Informer 开始 Iist-Watch 监听事件变化
// Run starts the schedulerCache func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { go sc.pdbInformer.Informer().Run(stopCh) go sc.podInformer.Informer().Run(stopCh) go sc.nodeInformer.Informer().Run(stopCh) go sc.podGroupInformerv1alpha1.Informer().Run(stopCh) go sc.podGroupInformerv1alpha2.Informer().Run(stopCh) go sc.pvInformer.Informer().Run(stopCh) go sc.pvcInformer.Informer().Run(stopCh) go sc.scInformer.Informer().Run(stopCh) go sc.queueInformerv1alpha1.Informer().Run(stopCh) go sc.queueInformerv1alpha2.Informer().Run(stopCh) if options.ServerOpts.EnablePriorityClass { go sc.pcInformer.Informer().Run(stopCh) } // Re-sync error tasks. go wait.Until(sc.processResyncTask, 0, stopCh) // Cleanup jobs. go wait.Until(sc.processCleanupJob, 0, stopCh) }
sc.podInformer
// create informer for pod information sc.podInformer = informerFactory.Core().V1().Pods() sc.podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch obj.(type) { case *v1.Pod: pod := obj.(*v1.Pod) if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending { return true } return pod.Status.Phase != v1.PodPending default: return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPod, UpdateFunc: sc.UpdatePod, DeleteFunc: sc.DeletePod, }, })
这里可以看到,kube-batch
只关心需要自己调度,并且Pending
的Pod;以及Running
的Pod。
kube-batchpkgschedulercacheevent_handlers.go
func (sc *SchedulerCache) AddPod(obj interface{}) { sc.Mutex.Lock() defer sc.Mutex.Unlock() err := sc.addPod(pod) } // Assumes that lock is already acquired. func (sc *SchedulerCache) addPod(pod *v1.Pod) error { pi := arbapi.NewTaskInfo(pod) return sc.addTask(pi) }
全局一把锁,以后会是性能瓶颈。这里我们看到kube-batch
会将Pod转换成TaskInfo缓存起来。
kube-batchpkgschedulerapijob_info.go
func NewTaskInfo(pod *v1.Pod) *TaskInfo { req := EmptyResource() // TODO(k82cn): also includes initContainers' resource. for _, c := range pod.Spec.Containers { req.Add(NewResource(c.Resources.Requests)) } ti := &TaskInfo{ UID: TaskID(pod.UID), Job: getJobID(pod), Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName, Status: getTaskStatus(pod), Priority: 1, Pod: pod, Resreq: req, } if pod.Spec.Priority != nil { ti.Priority = *pod.Spec.Priority } return ti }
转换过程比较简单,注意两点:
- 需要统计资源请求量
- JobID通过
pod.Annotations[arbcorev1.GroupNameAnnotationKey]
或者所属的controller
kube-batchpkgschedulercacheevent_handlers.go
func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error { if len(pi.Job) != 0 { if _, found := sc.Jobs[pi.Job]; !found { sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job) } sc.Jobs[pi.Job].AddTaskInfo(pi) } }
kube-batchpkgschedulerapijob_info.go
func NewJobInfo(uid JobID) *JobInfo { return &JobInfo{ UID: uid, MinAvailable: 0, NodeSelector: make(map[string]string), Allocated: EmptyResource(), TotalRequest: EmptyResource(), TaskStatusIndex: map[TaskStatus]tasksMap{}, Tasks: tasksMap{}, } } func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) { ji.Tasks[ti.UID] = ti ji.addTaskIndex(ti) ji.TotalRequest.Add(ti.Resreq) } func (ji *JobInfo) addTaskIndex(ti *TaskInfo) { if _, found := ji.TaskStatusIndex[ti.Status]; !found { ji.TaskStatusIndex[ti.Status] = tasksMap{} } ji.TaskStatusIndex[ti.Status][ti.UID] = ti }
最终task会归于一个job,job主要保存tasks,资源请求总量等信息。
kbinformer.Scheduling().V1alpha2().PodGroups
// create informer for PodGroup(v1alpha2) information sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups() sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPodGroupAlpha2, UpdateFunc: sc.UpdatePodGroupAlpha2, DeleteFunc: sc.DeletePodGroupAlpha2, })