深入k8s:Informer使用及其源码分析

转载 luozhiyun的博客:https://www.luozhiyun.com

Informer机制#

机制设计#

Informer主要有两个作用:

  1. 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。ListAndWatch通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化;
  2. 注册相应的事件,之后如果监听到的事件变化就会调用事件对应的EventHandler,实现回调。

Kubernetes中都是各种controller的实现,各种controller都会用到Informer

Informer运行原理如下:

image-20201017000845410

根据流程图来解释一下Informer中几个组件的作用:

  • Reflector:用于监控指定的k8s资源,当资源发生变化时,触发相应的变更事件,如Added事件、Updated事件、Deleted事件,并将器资源对象放到本地DeltaFIFO Queue中;
  • DeltaFIFO:DeltaFIFO是一个先进先出的队列,可以保存资源对象的操作类型;
  • Indexer:用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer;

Reflector 包会和 apiServer 建立长连接,并使用 ListAndWatch 方法获取并监听某一个资源的变化。List 方法将会获取某个资源的所有实例,Watch 方法则监听资源对象的创建、更新以及删除事件,然后将事件放入到DeltaFIFO Queue中;

然后Informer会不断的从 Delta FIFO Queue 中 pop 增量事件,并根据事件的类型来决定新增、更新或者是删除本地缓存;接着Informer 根据事件类型来触发事先注册好的 Event Handler触发回调函数,然后然后将该事件丢到 Work Queue 这个工作队列中。

实例#

go get -u -v kgo get -u -v k8s.io/client-go/..8s.io/client-go/..
git clone https://github.com/huweihuang/client-go.git
cd client-go
#保证本地HOME目录有配置kubernetes集群的配置文件
go run client-go.go

将到了go-client部分的代码,我们可以直接通过实例来进行上手跑动,Informers Example代码示例如下:

package main

import (
    "flag"
    v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
    "log"
    "path/filepath"
    "time"
)

func main() {
    var kubeconfig *string
    //如果是windows,那么会读取C:Usersxxx.kubeconfig 下面的配置文件
    //如果是linux,那么会读取~/.kube/config下面的配置文件
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    stopCh := make(chan struct{})
    defer close(stopCh)
    //表示每分钟进行一次resync,resync会周期性地执行List操作
    sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)

    informer := sharedInformers.Core().V1().Pods().Informer()

    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            mObj := obj.(v1.Object)
            log.Printf("New Pod Added to Store: %s", mObj.GetName())
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            oObj := oldObj.(v1.Object)
            nObj := newObj.(v1.Object)
            log.Printf("%s Pod Updated to %s", oObj.GetName(),nObj.GetName())
        },
        DeleteFunc: func(obj interface{}) {
            mObj := obj.(v1.Object)
            log.Printf("Pod Deleted from Store: %s", mObj.GetName())
        },
    })

    informer.Run(stopCh)
}

要运行这段代码,需要我们将k8s服务器上的~/.kube代码拷贝到本地,我是win10的机器所以拷贝到C:Usersxxx.kube中。

informers.NewSharedInformerFactory会传入两个参数,第1个参数clientset是用于与k8s apiserver交互的客户端,第2个参数是代表每分钟会执行一次resync,resync会周期性执行List将所有资源存放再Informer Store中,如果该参数是0,则禁用resync功能。

通过informer.AddEventHandler函数可以为pod资源添加资源事件回调方法,支持3种资源事件回调方法:

  • AddFunc
  • UpdateFunc
  • DeleteFunc

通过名称我们就可以知道是新增、更新、删除时会回调这些方法。

在我们初次执行run方法的时候,可以会将监控的k8s上pod存放到本地,并回调AddFunc方法,如下日志:

2020/10/17 15:13:10 New Pod Added to Store: dns-test
2020/10/17 15:13:10 New Pod Added to Store: web-1
2020/10/17 15:13:10 New Pod Added to Store: fluentd-elasticsearch-nwqph
2020/10/17 15:13:10 New Pod Added to Store: kube-flannel-ds-amd64-bjmt2
2020/10/17 15:13:10 New Pod Added to Store: kubernetes-dashboard-65665f84db-jrw6k
2020/10/17 15:13:10 New Pod Added to Store: mongodb
2020/10/17 15:13:10 New Pod Added to Store: web-0
....

这里我用一张图总结一下informer的Run方法流程:

image-20201017233145402

原文地址:https://www.cnblogs.com/dream397/p/14768931.html