MetaNamespaceKeyFunc

cat<<EOF >informer.go
package main

import (
 "fmt"

 v1 "k8s.io/api/core/v1"
 "k8s.io/apimachinery/pkg/api/meta"
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/client-go/tools/cache"
)

const (
 NamespaceIndexName = "namespace"
 NodeNameIndexName  = "nodeName"
)

func NamespaceIndexFunc(obj interface{}) ([]string, error) {
 m, err := meta.Accessor(obj)
 if err != nil {
  return []string{""}, fmt.Errorf("object has no meta: %v", err)
 }
 return []string{m.GetNamespace()}, nil
}

func NodeNameIndexFunc(obj interface{}) ([]string, error) {
 pod, ok := obj.(*v1.Pod)
 if !ok {
  return []string{}, nil
 }
 return []string{pod.Spec.NodeName}, nil
}

func main() {
 index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
  NamespaceIndexName: NamespaceIndexFunc,
  NodeNameIndexName:  NodeNameIndexFunc,
 })

 pod1 := &v1.Pod{
  ObjectMeta: metav1.ObjectMeta{
   Name:      "index-pod-1",
   Namespace: "default",
  },
  Spec: v1.PodSpec{NodeName: "node1"},
 }
 pod2 := &v1.Pod{
  ObjectMeta: metav1.ObjectMeta{
   Name:      "index-pod-2",
   Namespace: "default",
  },
  Spec: v1.PodSpec{NodeName: "node2"},
 }
 pod3 := &v1.Pod{
  ObjectMeta: metav1.ObjectMeta{
   Name:      "index-pod-3",
   Namespace: "kube-system",
  },
  Spec: v1.PodSpec{NodeName: "node2"},
 }

 _ = index.Add(pod1)
 _ = index.Add(pod2)
 _ = index.Add(pod3)

 // ByIndex 两个参数:IndexName(索引器名称)和 indexKey(需要检索的key)
 pods, err := index.ByIndex(NamespaceIndexName, "default")
 if err != nil {
  panic(err)
 }
 for _, pod := range pods {
  fmt.Println(pod.(*v1.Pod).Name)
 }

 fmt.Println("==========================")

 pods, err = index.ByIndex(NodeNameIndexName, "node2")
 if err != nil {
  panic(err)
 }
 for _, pod := range pods {
  fmt.Println(pod.(*v1.Pod).Name)
 }

}
EOF
root@ubuntu:~/go_learn/informer# go mod  vendor
root@ubuntu:~/go_learn/informer# go build -o informer .
root@ubuntu:~/go_learn/informer# ./informer 
index-pod-1
index-pod-2
==========================
index-pod-2
index-pod-3

在上面的示例中首先通过 NewIndexer 函数实例化 Indexer 对象,第一个参数就是用于计算资源对象键的函数,这里我们使用的是 MetaNamespaceKeyFunc 这个默认的对象键函数;第二个参数是 Indexers,也就是存储索引器,上面我们知道 Indexers 的定义为 map[string]IndexFunc,为什么要定义成一个 map 呢?我们可以类比数据库中,我们要查询某项数据,索引的方式是不是多种多样啊?为了扩展,Kubernetes 中就使用一个 map 来存储各种各样的存储索引器,至于存储索引器如何生成,就使用一个 IndexFunc 暴露出去,给使用者自己实现即可。

这里我们定义的了两个索引键生成函数:NamespaceIndexFunc 与 NodeNameIndexFunc,一个根据资源对象的命名空间来进行索引,一个根据资源对象所在的节点进行索引。然后定义了3个 Pod,前两个在 default 命名空间下面,另外一个在 kube-system 命名空间下面,然后通过 index.Add 函数添加这3个 Pod 资源对象。然后通过 index.ByIndex 函数查询在名为 namespace 的索引器下面匹配索引键为 default 的 Pod 列表。也就是查询 default 这个命名空间下面的所有 Pod,这里就是前两个定义的 Pod。

对上面的示例如果我们理解了,那么就很容易理解上面定义的4个数据结构了:

  • IndexFunc:索引器函数,用于计算一个资源对象的索引值列表,上面示例是指定命名空间为索引值结果,当然我们也可以根据需求定义其他的,比如根据 Label 标签、Annotation 等属性来生成索引值列表。
  • Index:存储数据,对于上面的示例,我们要查找某个命名空间下面的 Pod,那就要让 Pod 按照其命名空间进行索引,对应的 Index 类型就是 map[namespace]sets.pod
  • Indexers:存储索引器,key 为索引器名称,value 为索引器的实现函数,上面的示例就是 map["namespace"]MetaNamespaceIndexFunc
  • Indices:存储缓存器,key 为索引器名称,value 为缓存的数据,对于上面的示例就是 map["namespace"]map[namespace]sets.pod

可能最容易混淆的是 Indexers 和 Indices 这两个概念,因为平时很多时候我们没有怎么区分二者的关系,这里我们可以这样理解:Indexers 是存储索引(生成索引键)的,Indices 里面是存储的真正的数据(对象键),这样可能更好理解。

 

按照上面的理解我们可以得到上面示例的索引数据如下所示:

// Indexers 就是包含的所有索引器(分类)以及对应实现
Indexers: {  
  "namespace": NamespaceIndexFunc,
  "nodeName": NodeNameIndexFunc,
}
// Indices 就是包含的所有索引分类中所有的索引数据
Indices: {
 "namespace": {  //namespace 这个索引分类下的所有索引数据
  "default": ["pod-1", "pod-2"],  // Index 就是一个索引键下所有的对象键列表
  "kube-system": ["pod-3"]   // Index
 },
 "nodeName": {  //nodeName 这个索引分类下的所有索引数据(对象键列表)
  "node1": ["pod-1"],  // Index
  "node2": ["pod-2", "pod-3"]  // Index
 }
}
root@ubuntu:~/go_learn/informer# go build -o informer .
root@ubuntu:~/go_learn/informer# ./informer 
key is  default/index-pod-1
key2 is  default/index-pod-1
root@ubuntu:~/go_learn/informer# cat informer.go
package main

import (
 "fmt"

 v1 "k8s.io/api/core/v1"
 //"k8s.io/apimachinery/pkg/api/meta"
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/client-go/tools/cache"
 "k8s.io/client-go/util/workqueue"
)

func main() {
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    pod1 := &v1.Pod{
     ObjectMeta: metav1.ObjectMeta{
     Name:      "index-pod-1",
     Namespace: "default",
    },
    Spec: v1.PodSpec{NodeName: "node1"},
    }
     key, err := cache.MetaNamespaceKeyFunc(pod1)
   if err == nil {
    fmt.Println("key is ", key)
    queue.Add(key)
   }

   key2, err2  := queue.Get()
  if err2  {
    fmt.Println("quit")
   } else {
    fmt.Println("key2 is ", key2)
  }
}
root@ubuntu:~/go_learn/informer# ./informer  -kubeconfig=$HOME/.kube/config
key is  default/index-pod-1
key2 is  default/index-pod-1
root@ubuntu:~/go_learn/informer# cat informer.go
package main

import (
 "fmt"
 "flag"
 "time"
 "k8s.io/client-go/kubernetes"
 "k8s.io/client-go/tools/clientcmd"
 klog "k8s.io/klog/v2"
 "k8s.io/client-go/informers"
 v1 "k8s.io/api/core/v1"
 //"k8s.io/apimachinery/pkg/api/meta"
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/client-go/tools/cache"
 "k8s.io/client-go/util/workqueue"
 "k8s.io/kubectl/pkg/util/logs"
)
var kubeconfig string

func init() {
    flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}


func main() {
    flag.Parse()
    logs.InitLogs()
    defer logs.FlushLogs()

    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        panic(err.Error())
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        klog.Fatal(err)
    }

    factory := informers.NewSharedInformerFactory(clientset, time.Hour*24)
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    pod1 := &v1.Pod{
     ObjectMeta: metav1.ObjectMeta{
     Name:      "index-pod-1",
     Namespace: "default",
    },
    Spec: v1.PodSpec{NodeName: "node1"},
    }
     key, err := cache.MetaNamespaceKeyFunc(pod1)
   if err == nil {
    fmt.Println("key is ", key)
    queue.Add(key)
   }

   key2, err2  := queue.Get()
  if err2  {
    fmt.Println("quit")
   } else {
    fmt.Println("key2 is ", key2)
  }
  informer :=  factory.Core().V1().Pods().Informer()
  stop := make(chan struct{})
  go informer.Run(stop)
  pod,_,_:= informer.GetStore().GetByKey(key2.(string))
  fmt.Println(pod.(*v1.Pod).Name)
}
root@ubuntu:~/go_learn/informer# ./informer  -kubeconfig=$HOME/.kube/config
key is  default/index-pod-1
key2 is  default/index-pod-1
panic: interface conversion: interface {} is nil, not *v1.Pod

goroutine 1 [running]:
main.main()
        /root/go_learn/informer/informer.go:65 +0x7e8
root@ubuntu:~/go_learn/informer# go build -o informer . 
root@ubuntu:~/go_learn/informer# ./informer 
key is  default/index-pod-1
key2 is  default/index-pod-1
index-pod-1
root@ubuntu:~/go_learn/informer# cat informer.go
package main

import (
 "fmt"
 v1 "k8s.io/api/core/v1"
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/client-go/tools/cache"
 "k8s.io/client-go/util/workqueue"
)


func main() {
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    pod1 := &v1.Pod{
     ObjectMeta: metav1.ObjectMeta{
     Name:      "index-pod-1",
     Namespace: "default",
    },
    Spec: v1.PodSpec{NodeName: "node1"},
    }
     key, err := cache.MetaNamespaceKeyFunc(pod1)
   if err == nil {
    fmt.Println("key is ", key)
    queue.Add(key)
    _ = indexer.Add(pod1)
   }

   key2, err2  := queue.Get()
  if err2  {
    fmt.Println("quit")
   } else {
    fmt.Println("key2 is ", key2)
  }

  pod,_,_ := indexer.GetByKey(key2.(string))
  fmt.Println(pod.(*v1.Pod).Name)
}

https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go

What I learnt about Kubernetes Controllers

client-go 之 Indexer 的理解

原文地址:https://www.cnblogs.com/dream397/p/14986806.html