Kubernetes源码阅读笔记——Controller Manager(之一)

Controller Manager是Kubernetes的核心组件之一。我们知道,Kubernetes对集群的管理采用的是控制器模式,即针对各种资源运行多个controller(控制器)。控制器的逻辑是运行永不结束的循环,通过apiserver组件时刻获取集群某种资源的状态,并确保资源的当前状态与期望的状态相符合。

下面我们就来通过阅读源码,看一下Kubernetes中Controller Manager的具体实现。

Kubernetes中与Controller Manager相关的包有2个,分别是cmd/cotroller-manager和cmd/kube-controller-manager(暂时不明白为什么要分成两部分)。

启动函数是kube-controller-manager下的controller-manager.go。下面我们先从启动函数入手。

一、启动函数controller-manager.go

启动函数很短:

func main() {
    rand.Seed(time.Now().UnixNano())

    command := app.NewControllerManagerCommand()

    // TODO: once we switch everything over to Cobra commands, we can go back to calling
    // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
    // normalize func and add the go flag set by hand.
    // utilflag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()

    if err := command.Execute(); err != nil {
    fmt.Fprintf(os.Stderr, "%v
", err)
    os.Exit(1)
    }
}

其中,第一行是生成随机数的代码,log相关的两行是处理日志,最核心的内容在于command。

进入NewControllerManagerCommand方法,我们发现这一方法位于cmd/kube-controller-manager/app/controllermanager.go中。这个方法的返回值是一个cobra.Command类型的指针。

这里稍微提一下cobra。cobra是一个Go语言的开源项目,用于在命令行中注册新命令。可参考https://github.com/spf13/cobra。cobra的基本结构就是注册一个cobra.Command类型的指针,然后调用Execute命令执行。可以看到main函数就遵循了这样的结构。

二、NewControllerManagerCommand方法

cobra.Command是一个结构体,我们看到NewControllerManagerCommand方法里定义了最核心的Use、Long、Run三个字段:

cmd/kube-controller-manager/app/controllermanager.go

func NewControllerManagerCommand(){
    cmd := &cobra.Command{
        Use: "kube-controller-manager"
        Long: `...`
        Run: func(cmd *cobra.Command, args []string) {
            verflag.PrintAndExitIfRequested()
        utilflag.PrintFlags(cmd.Flags())

        c, err := s.Config(KnownControllers(),ControllersDisabledByDefault.List())
        if err != nil {
            fmt.Fprintf(os.Stderr, "%v
", err)
            os.Exit(1)
        }

        if err := Run(c.Complete(), wait.NeverStop); err != nil {
            fmt.Fprintf(os.Stderr, "%v
", err)
            os.Exit(1)
        }
        },
    }
}

Use是命令本身,即在命令行中输入kube-controller-manager,即可运行。Long是对命令的详细说明,而Run则是命令的具体执行内容,也是核心。

Run后面有一段,是为kube-controller-manager命令配置flag的。

我们来仔细解读一下Run。

前两行PrintAndExitIfRequested和PrintFlags是处理打印版本和可用flag的,不重要。重点在后面的Config方法和Run方法。

在创建cmd之前,NewControllerManagerCommand方法其实还有一行代码:

cmd/kube-controller-manager/app/controllermanager.go

func NewControllerManagerCommand(){
    s, err := options.NewKubeControllerManagerOptions()
}

进入这个NewKubeControllerManagerOptions方法看,发现方法位于cmd/kube-controller-manager/app/options/options.go内,作用是创建了一个使用默认配置的KubeControllerManagerOptions结构体,包含了DeploymentController、ReplicationController等多个controller的配置。

利用这里创建的KubeControllerManagerOptions结构体,在Command的Run字段中执行了Config和Run两个操作。

Config方法是配置集群的kubeconfig等基础配置,第一个参数KnownControllers()值得关注。

三、KnownControllers方法

KnownControllers是同一个go文件下的方法,作用是将NewControllerInitializers方法中返回的Map的键生成一个list。

进入同文件下的NewControllerInitializers方法,我们发现:

cmd/kube-controller-manager/app/controllermanager.go

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    ...
    ...
    return controllers
}

这一方法,将controller-manager中的所有controller都注册了进来。每个controller都以名字为键,启动函数为值,存储在Map中。因此可以说,这个NewControllerInitializers方法维护了controller-manager的元数据,是controller-manager的重要方法之一。

将这些controller加载上配置后,就是下面核心的Run方法了。

四、Run方法

Run方法也在cmd/kube-controller-manager/app/controllermanager.go中,接收2个参数。第一个参数调用Config的Complete方法,对config再进行一次包装,第二个参数是一个单向channel,用于使方法阻塞,从而保持运行状态。

cmd/kube-controller-manager/app/controllermanager.go

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    ...

    // Setup any healthz checks we will want to use.
    ...

    // Start the controller manager HTTP server
    // unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
    ...

   run := func(ctx context.Context) {
       rootClientBuilder := controller.SimpleControllerClientBuilder{
          ClientConfig: c.Kubeconfig,
       }
       var clientBuilder controller.ControllerClientBuilder
       if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
          if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
             // It'c possible another controller process is creating the tokens for us.
             // If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
             klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
          }
          clientBuilder = controller.SAControllerClientBuilder{
             ClientConfig:         restclient.AnonymousClientConfig(c.Kubeconfig),
             CoreClient:           c.Client.CoreV1(),
             AuthenticationClient: c.Client.AuthenticationV1(),
             Namespace:            "kube-system",
          }
       } else {
          clientBuilder = rootClientBuilder
       }
       controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
       if err != nil {
          klog.Fatalf("error building controller context: %v", err)
       }
       saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

       if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
          klog.Fatalf("error starting controllers: %v", err)
       }

       controllerContext.InformerFactory.Start(controllerContext.Stop)
       close(controllerContext.InformersStarted)

       select {}
    }

    if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
       run(context.TODO())
       panic("unreachable")
    }
}

前面几段分别是处理日志、健康检查、以及启动Controller Manager的HTTP server,不提。重点在后面的run。

run中,首先调用CreateControllerContext方法,为controller的运行准备环境。方法创建了多个client,确保Controller Manager能连接上API Server,并返回一个ControllerContext结构体,为下面的controller使用。

其次,调用StartControllers方法,开始正式运行controller。

StartControllers方法位于同一文件中,执行逻辑很直观,就是将之前保存在NewControllerInitializers中的controller全部运行起来(除了特殊的ServiceAccountTokenController,它在前面的环境准备中先运行起来),方法是分别调用这些controller的启动函数。关于它们的启动函数,将在下一篇文章中分析。

在所有的controller中,ServiceAccount Token Controller 最先启动,因为后面所有的controller运行都需要使用SATokenController创建的token。

这样,controller-manager模块就算是正式启动了。

run的后面还有一段,是处理高可用controller-manager的节点选举相关的,暂时不提。

五、Informer

Informer是Kubernetes中一个重要的概念。它本质上是一个client-go中的一个工具包,会将资源变化的信息通知缓存和listener。可参考https://blog.csdn.net/weixin_42663840/article/details/81980022

每个controller都会启动自己的informer。Run方法最后controllerContext.InformerFactory.Start(controllerContext.Stop)这一行代码就是将controller的informerfactory启动起来。

这个informerfactory,顾名思义是创建informer的工厂,在CreateControllerContext中初始化。事实上,这个factory目前还没有生成任何informer,因为在CreateControllerContext方法中,在创建这个工厂实例时并没有为这个实例配置任何额外的参数。直到后面分别启动每个Controller时,才会通过这个工厂生成具体的informer。

进入Start方法:

k8s.io/client-go/informers/factory.go

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

方法本质上调用了informer的Run方法。

k8s.io/client-go/tools/cache/shard_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    ...

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    s.controller.Run(stopCh)
}

此方法创建一个FIFO队列,之后根据informer的配置实例化一个client-go包中的controller对象(此controller非controller manager的controller),然后调用controller的Run方法(配置中有一条Process: s.HandleDeltas很关键,后面会用到)。

k8s.io/client-go/tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
	defer wg.Wait()

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
}

可见,controller的Run方法实例化一个Reflector对象,并将自己的reflector字段设置为这个对象,并调用对象的Run方法。Reflector也是k8s中的一个概念,作用在于通过List-Watch机制,与API Server连接,及时获取监听的k8s资源的变化。这一步通过调用reflector的Run方法来实现。Informer正是通过这一机制,在自身被动传达API Server发送的通知的同时,也会主动向API Server获取资源变化。Reflector下一篇继续研究。

另一方面,controller调用processLoop方法,不断地从这个FIFO队列中取出元素,并调用Process方法进行处理。

看一下processLoop方法:

k8s.io/client-go/tools/cache/controller.go

func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == FIFOClosedError {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

可以看到,processLoop方法本质上就是从FIFO队列中pop元素,并调用Process进行处理。而这个Process已经在前述sharedIndexInformer的Run方法中,通过Process: s.HandleDeltas这一行进行了定义。

看一下HandleDeltas方法:

k8s.io/client-go/tools/cache/shared_informers.go

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }

可见,在这个方法中,informer通过indexer,与FIFO队列交互,执行添加、更新或删除等操作,并调用distribute方法,向listener传递信息(listener下一篇会提到。)

关于Informer,下一篇文章会继续分析。

下一篇文章请见https://www.cnblogs.com/00986014w/p/10273738.html

五、总结

总而言之,Controller Manager的大致逻辑就是,通过cobra创建一个kube-controller-manager命令,并运行它。这个命令的内容是启动Controller Manager。这个Controller Manager管理Kubernetes中所有的controller,在manager启动时,会调用这些controller的启动函数,启动这些controller,并启动controller对应的informer。

原文地址:https://www.cnblogs.com/00986014w/p/10251844.html