informer

root@ubuntu:~/go_learn/informer# cat informer.go
package main
import (
 "os"
"fmt"
"github.com/spongeprojects/magicconch"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
    "k8s.io/apimachinery/pkg/fields"
     "k8s.io/client-go/kubernetes"
         "k8s.io/client-go/tools/clientcmd"
)
func mustClientset() kubernetes.Interface {
    kubeconfig := os.Getenv("KUBECONFIG")

    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    magicconch.Must(err)

    clientset, err := kubernetes.NewForConfig(config)
    magicconch.Must(err)

    return clientset
}
// newConfigMapsListerWatcher 用于创建 tmp namespace 下 configmaps 资源的 ListerWatcher 实例
func newConfigMapsListerWatcher() cache.ListerWatcher {
clientset := mustClientset()              // 前面有说明
client := clientset.CoreV1().RESTClient() // 客户端,请求器
resource := "configmaps"                  // GET 请求参数之一
namespace := "tmp"                        // GET 请求参数之一
selector := fields.Everything()           // GET 请求参数之一
lw := cache.NewListWatchFromClient(client, resource, namespace, selector)
return lw
}

func main() {
fmt.Println("----- 6-indexer -----")

lw := newConfigMapsListerWatcher()
indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
// 仅演示用,只关心 indexer,不处理事件,所以传一个空的 HandlerFunc,
// 实际使用中一般不会这样做
indexer, informer := cache.NewIndexerInformer(
lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{}, indexers)

stopCh := make(chan struct{})
defer close(stopCh)

fmt.Println("Start syncing....")

go informer.Run(stopCh)

// 在 informer 首次同步完成后再操作
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
panic("timed out waiting for caches to sync")
}

// 获取 cache.NamespaceIndex 索引下,索引值为 "tmp" 中的所有键
keys, err := indexer.IndexKeys(cache.NamespaceIndex, "tmp")
magicconch.Must(err)
for _, k := range keys {
fmt.Println(k)
}
}

先创建

root@ubuntu:~# kubectl create configmap -n tmp demo
configmap/demo created
root@ubuntu:~# kubectl create configmap -n tmp demo2
configmap/demo2 created
root@ubuntu:~# kubectl create configmap -n tmp demo23
configmap/demo23 created
root@ubuntu:~# 

后运行

root@ubuntu:~/go_learn/informer# ./informer --kubeconfig=$HOME/.kube/config 
----- 6-indexer -----
Start syncing....
tmp/demo
tmp/demo2
tmp/demo23
root@ubuntu:~/go_learn/informer# 

demo2

package main
import (
 "os"
"fmt"
"github.com/spongeprojects/magicconch"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
    "k8s.io/apimachinery/pkg/fields"
     "k8s.io/client-go/kubernetes"
         "k8s.io/client-go/tools/clientcmd"
)
func mustClientset() kubernetes.Interface {
    kubeconfig := os.Getenv("KUBECONFIG")

    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    magicconch.Must(err)

    clientset, err := kubernetes.NewForConfig(config)
    magicconch.Must(err)

    return clientset
}
// newConfigMapsListerWatcher 用于创建 tmp namespace 下 configmaps 资源的 ListerWatcher 实例
func newConfigMapsListerWatcher() cache.ListerWatcher {
clientset := mustClientset()              // 前面有说明
client := clientset.CoreV1().RESTClient() // 客户端,请求器
resource := "configmaps"                  // GET 请求参数之一
namespace := "tmp"                        // GET 请求参数之一
selector := fields.Everything()           // GET 请求参数之一
lw := cache.NewListWatchFromClient(client, resource, namespace, selector)
return lw
}

func main() {
fmt.Println("----- 6-indexer -----")

lw := newConfigMapsListerWatcher()
indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
// 仅演示用,只关心 indexer,不处理事件,所以传一个空的 HandlerFunc,
// 实际使用中一般不会这样做
    indexer, informer := cache.NewIndexerInformer(lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
                return
            }
            fmt.Printf("created: %s
", configMap.Name)
        },
    }, indexers)

stopCh := make(chan struct{})
defer close(stopCh)

fmt.Println("Start syncing....")

go informer.Run(stopCh)

// 在 informer 首次同步完成后再操作
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
panic("timed out waiting for caches to sync")
}

// 获取 cache.NamespaceIndex 索引下,索引值为 "tmp" 中的所有键
keys, err := indexer.IndexKeys(cache.NamespaceIndex, "tmp")
magicconch.Must(err)
for _, k := range keys {
fmt.Println(k)
}
<-stopCh
}
./informer --kubeconfig=$HOME/.kube/config 
----- 6-indexer -----
Start syncing....
tmp/demo
tmp/demo2
tmp/demo23
tmp/demo43

debug kubernets controller 和 CRD 具体组件分析

原文地址:https://www.cnblogs.com/dream397/p/14990013.html