k8s CustomResource

rest api

 kubectl api-resources 

kind: CustomResourceDefinition

[root@bogon deploy]# grep 'kind: CustomResourceDefinition' -rn *
cluster.karmada.io_clusters.yaml:4:kind: CustomResourceDefinition
multicluster.x-k8s.io_serviceexports.yaml:15:kind: CustomResourceDefinition
multicluster.x-k8s.io_serviceimports.yaml:15:kind: CustomResourceDefinition
policy.karmada.io_clusteroverridepolicies.yaml:4:kind: CustomResourceDefinition
policy.karmada.io_clusterpropagationpolicies.yaml:4:kind: CustomResourceDefinition
policy.karmada.io_overridepolicies.yaml:4:kind: CustomResourceDefinition
policy.karmada.io_propagationpolicies.yaml:4:kind: CustomResourceDefinition
policy.karmada.io_replicaschedulingpolicies.yaml:4:kind: CustomResourceDefinition
work.karmada.io_clusterresourcebindings.yaml:4:kind: CustomResourceDefinition
work.karmada.io_resourcebindings.yaml:4:kind: CustomResourceDefinition
work.karmada.io_works.yaml:4:kind: CustomResourceDefinition
[root@bogon deploy]# 

scheme.AddKnownTypes(

// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
        scheme.AddKnownTypes(SchemeGroupVersion,
                &Cluster{},
                &ClusterList{},
        )
        // AddToGroupVersion allows the serialization of client types like ListOptions.
        v1.AddToGroupVersion(scheme, SchemeGroupVersion)
        return nil
}

demo1

https://github.com/ysku/my-k8s-custom-controller/blob/master/pkg/controller/pod.go

package controller

import (
    log "github.com/sirupsen/logrus"
    v1 "k8s.io/api/core/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

func NewPodLoggingController(factory informers.SharedInformerFactory) *LoggingController {
    informer := factory.Core().V1().Pods().Informer()
    informer.AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    podAdd,
            UpdateFunc: podUpdate,
            DeleteFunc: podDelete,
        },
    )
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    return NewLoggingController("pod", queue, informer)
}

func podAdd(obj interface{}) {
    pod := obj.(*v1.Pod)
    log.Printf("[podAdd] namespace:%s, name:%s, labels:%v", pod.Namespace, pod.Name, pod.GetLabels())
}

func podUpdate(old, new interface{}) {
    oldPod := old.(*v1.Pod)
    newPod := new.(*v1.Pod)
    log.Printf("[podUpdate] old, namespace:%s, name:%s, labels:%v", oldPod.Namespace, oldPod.Name, oldPod.GetLabels())
    log.Printf("[podUpdate] new, namespace:%s, name:%s, labels:%v", newPod.Namespace, newPod.Name, newPod.GetLabels())
}

func podDelete(obj interface{}) {
    pod := obj.(*v1.Pod)
    log.Printf("[podDelete] namespace:%s, name:%s, labels:%v", pod.Namespace, pod.Name, pod.GetLabels())
}

demo2

https://github.com/aws/aws-app-mesh-controller-for-k8s/blob/1bcc239c0586c99c18636c1143a8066f94f4136a/pkg/k8s/pod_wrapper.go

package k8s

import (
    "fmt"

    v1 "k8s.io/api/core/v1"
    apimeta "k8s.io/apimachinery/pkg/api/meta"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/types"
    "sigs.k8s.io/controller-runtime/pkg/client"
)

// PodsRepository represents an interface with all the common operations on pod objects
type PodsRepository interface {
    GetPod(namespace string, name string) (*v1.Pod, error)
    ListPodsWithMatchingLabels(opts client.ListOptions) (*v1.PodList, error)
}

// podsRepository is the wrapper object with the client
type podsRepository struct {
    customController *CustomController
}

// NewPodsRepository returns a new PodsRepository
func NewPodsRepository(customController *CustomController) PodsRepository {
    return &podsRepository{
        customController: customController,
    }
}

// GetPod returns the pod object using NamespacedName
func (k *podsRepository) GetPod(namespace string, name string) (*v1.Pod, error) {
    nsName := types.NamespacedName{
        Namespace: namespace,
        Name:      name,
    }.String()
    obj, exists, err := k.customController.GetDataStore().GetByKey(nsName)
    if err != nil {
        return nil, err
    }
    if !exists {
        return nil, fmt.Errorf("failed to find pod %s", nsName)
    }
    return obj.(*v1.Pod), nil
}

// ListPods return list of pods within a Namespace having Matching Labels
// ListOptions.LabelSelector must be specified to return pods with matching labels
// ListOptions.Namespace will scope result list to a given namespace
func (k *podsRepository) ListPodsWithMatchingLabels(opts client.ListOptions) (*v1.PodList, error) {
    var items []interface{}
    var err error

    if opts.Namespace != "" {
        items, err = k.customController.GetDataStore().ByIndex(NamespaceIndexKey, opts.Namespace)
    } else {
        items = k.customController.GetDataStore().List()
    }
    if err != nil {
        return nil, err
    }

    podList := &v1.PodList{}

    var labelSel labels.Selector
    if opts.LabelSelector != nil {
        labelSel = opts.LabelSelector
    }

    for _, item := range items {
        pod, ok := item.(*v1.Pod)
        if !ok {
            return nil, fmt.Errorf("cache contained %T, which is not a Pod", item)
        }

        meta, err := apimeta.Accessor(pod)
        if err != nil {
            return nil, err
        }
        if labelSel != nil {
            lbls := labels.Set(meta.GetLabels())
            if !labelSel.Matches(lbls) {
                continue
            }
        }
        podList.Items = append(podList.Items, *pod)
    }
    return podList, nil
}

 demo3

https://github.com/ikruglov/kube-custom-monitor/blob/e128afacf04e5bb32ab29f4a6ef350511edcd9a4/slo/slo.go

func (pm *podStartupLatencyDataMonitor) Run(stop <-chan struct{}) error {
    eventInformer := pm.eventInformer.Informer()
    eventInformer.AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    func(obj interface{}) { pm.handleEvent(obj.(*api_v1.Event)) },
            UpdateFunc: func(old, new interface{}) { pm.handleEvent(new.(*api_v1.Event)) },
        },
        0*time.Second, // disable resync
    )

    podInformer := pm.podInformer.Informer()
    podInformer.AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                pm.handlePodUpdate(obj.(*api_v1.Pod))
            },
            UpdateFunc: func(old, new interface{}) {
                pm.handlePodUpdate(new.(*api_v1.Pod))
            },
            DeleteFunc: func(obj interface{}) {
                pod, ok := obj.(*api_v1.Pod)
                if !ok {
                    if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
                        if pod, ok = d.Obj.(*api_v1.Pod); !ok {
                            glog.Errorf("failed to cast embedded object from tombstone to *v1.Pod: %v", d.Obj)
                            return
                        }
                    } else {
                        glog.Errorf("failed to cast observed object to *v1.Pod: %v", obj)
                        return
                    }
                }

                pm.handlePodDelete(pod)
            },
        },
        0*time.Second, // disable resync
    )

    go eventInformer.Run(stop)
    go podInformer.Run(stop)
    if !cache.WaitForCacheSync(stop, podInformer.HasSynced, eventInformer.HasSynced) {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    pm.initDone = true

    go func() {
        wait.Until(func() {
            pm.Lock()
            defer pm.Unlock()

            now := time.Now()
            for podKey, when := range pm.toDelete {
                if when.Before(now) {
                    glog.V(1).Infof("cleanup pod %s", podKey)
                    delete(pm.toDelete, podKey)
                    delete(pm.pods, podKey)
                }
            }
        }, 15*time.Second, stop)
    }()

    return nil
}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package watchers

import (
    "reflect"
    "time"

    api "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/client-go/kubernetes"
    cache "k8s.io/client-go/tools/cache"
)

// ServiceUpdate struct
type ServiceUpdate struct {
    Service *api.Service
    Op      Operation
}

// ServiceWatcher struct
type ServiceWatcher struct {
    serviceController cache.Controller
    serviceLister     cache.Indexer
    broadcaster       *Broadcaster
}

// ServiceUpdatesHandler interface
type ServiceUpdatesHandler interface {
    OnServiceUpdate(serviceUpdate *ServiceUpdate)
}

func (svcw *ServiceWatcher) addEventHandler(obj interface{}) {
    service, ok := obj.(*api.Service)
    if !ok {
        return
    }
    svcw.broadcaster.Notify(&ServiceUpdate{Op: ADD, Service: service})
}

func (svcw *ServiceWatcher) deleteEventHandler(obj interface{}) {
    service, ok := obj.(*api.Service)
    if !ok {
        return
    }
    svcw.broadcaster.Notify(&ServiceUpdate{Op: REMOVE, Service: service})
}

func (svcw *ServiceWatcher) updateEventHandler(oldObj, newObj interface{}) {
    service, ok := newObj.(*api.Service)
    if !ok {
        return
    }
    if !reflect.DeepEqual(newObj, oldObj) {
        svcw.broadcaster.Notify(&ServiceUpdate{Op: UPDATE, Service: service})
    }
}

// RegisterHandler for register service update interface
func (svcw *ServiceWatcher) RegisterHandler(handler ServiceUpdatesHandler) {
    svcw.broadcaster.Add(ListenerFunc(func(instance interface{}) {
        handler.OnServiceUpdate(instance.(*ServiceUpdate))
    }))
}

// ListBySelector for list services with labels
func (svcw *ServiceWatcher) ListBySelector(set map[string]string) (ret []*api.Service, err error) {
    selector := labels.SelectorFromSet(set)
    err = cache.ListAll(svcw.serviceLister, selector, func(m interface{}) {
        ret = append(ret, m.(*api.Service))
    })
    return ret, err
}

// HasSynced return true if serviceController.HasSynced()
func (svcw *ServiceWatcher) HasSynced() bool {
    return svcw.serviceController.HasSynced()
}

// StartServiceWatcher start watching updates for services from Kuberentes API server
func StartServiceWatcher(clientset kubernetes.Interface, resyncPeriod time.Duration, stopCh <-chan struct{}) (*ServiceWatcher, error) {
    svcw := ServiceWatcher{}

    eventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc:    svcw.addEventHandler,
        DeleteFunc: svcw.deleteEventHandler,
        UpdateFunc: svcw.updateEventHandler,
    }

    svcw.broadcaster = NewBroadcaster()
    lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
    svcw.serviceLister, svcw.serviceController = cache.NewIndexerInformer(
        lw,
        &api.Service{}, resyncPeriod, eventHandler,
        cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
    )
    go svcw.serviceController.Run(stopCh)
    return &svcw, nil
}

configmaps

// addConfigMapHandler adds the handler for config maps to the controller
func (lbc *LoadBalancerController) addConfigMapHandler(handlers cache.ResourceEventHandlerFuncs, namespace string) {
        lbc.configMapLister.Store, lbc.configMapController = cache.NewInformer(
                cache.NewListWatchFromClient(
                        lbc.client.CoreV1().RESTClient(),
                        "configmaps",
                        namespace,
                        fields.Everything()),
                &api_v1.ConfigMap{},
                lbc.resync,
                handlers,
        )
        lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.configMapController.HasSynced)
}

NewCustomController

  
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
//     http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package k8s

import (
    "context"
    "time"

    "github.com/go-logr/logr"
    apimeta "k8s.io/apimachinery/pkg/api/meta"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
)

// Converter for converting k8s object and object list used in watches and list operation
type Converter interface {
    // ConvertObject takes an object and returns the modified object which will be
    // stored in the data store
    ConvertObject(originalObj interface{}) (convertedObj interface{}, err error)
    // ConvertList takes an object and returns the modified list of objects which
    // will be returned to the Simple Pager function to aggregate the list pagination
    // response
    ConvertList(originalList interface{}) (convertedList interface{}, err error)
    // Resource returns the K8s resource name to list/watch
    Resource() string
    // ResourceType returns the k8s object to list/watch
    ResourceType() runtime.Object
}

// Controller Interface implemented by PodController
type Controller interface {
    // StartController starts the controller. Will block the calling routine
    StartController(dataStore cache.Indexer, stopChanel chan struct{})
    // GetDataStore returns the data store once it has synced with the API Server
    GetDataStore() cache.Indexer
}

// CustomController is an Informer which converts Pod Objects and notifies corresponding event handlers via Channels
type CustomController struct {
    // clientSet is the kubernetes client set
    clientSet *kubernetes.Clientset
    // pageLimit is the number of objects returned per page on a list operation
    pageLimit int64
    // namespace to list/watch for
    namespace string
    // converter is the converter implementation that converts the k8s
    // object before storing in the data store
    converter Converter
    // resyncPeriod how often to sync using list with the API Server
    resyncPeriod time.Duration
    // retryOnError whether item should be retried on error. Should remain false in usual use case
    retryOnError bool
    // queue is the Delta FIFO queue
    queue *cache.DeltaFIFO
    // podEventNotificationChan channel will be notified for all pod events
    eventNotificationChan chan<- GenericEvent

    // log for custom controller
    log logr.Logger
    // controller is the K8s Controller
    controller cache.Controller
    // dataStore with the converted k8s object. It should not be directly accessed and used with
    // the exposed APIs
    dataStore cache.Indexer
}

// NewCustomController returns a new podController object
func NewCustomController(clientSet *kubernetes.Clientset, pageLimit int64, namesspace string, converter Converter, resyncPeriod time.Duration,
    retryOnError bool, eventNotificationChan chan<- GenericEvent, log logr.Logger) *CustomController {
    c := &CustomController{
        clientSet:             clientSet,
        pageLimit:             pageLimit,
        namespace:             namesspace,
        converter:             converter,
        resyncPeriod:          resyncPeriod,
        retryOnError:          retryOnError,
        eventNotificationChan: eventNotificationChan,
        log:                   log,
    }
    c.dataStore = cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{NamespaceIndexKey: NamespaceKeyIndexFunc()})
    c.queue = cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, c.dataStore)
    return c
}

// StartController starts the custom controller by doing a list and watch on the specified k8s
// resource. The controller would store the converted k8s object in the provided indexer. The
// stop channel should be notified to stop the controller
func (c *CustomController) StartController(stopChanel <-chan struct{}) {
    config := &cache.Config{
        Queue: c.queue,
        ListerWatcher: newListWatcher(c.clientSet.CoreV1().RESTClient(),
            c.converter.Resource(), c.namespace, c.pageLimit, c.converter),
        ObjectType:       c.converter.ResourceType(),
        FullResyncPeriod: c.resyncPeriod,
        RetryOnError:     c.retryOnError,
        Process: func(obj interface{}) error {
            // from oldest to newest
            for _, d := range obj.(cache.Deltas) {
                // Strip down the pod object and keep only the required details
                convertedObj, err := c.converter.ConvertObject(d.Object)
                if err != nil {
                    return err
                }
                switch d.Type {
                case cache.Sync, cache.Added, cache.Updated:
                    if old, exists, err := c.dataStore.Get(convertedObj); err == nil && exists {
                        if err := c.dataStore.Update(convertedObj); err != nil {
                            return err
                        }
                        if err := c.notifyChannelOnUpdate(old, convertedObj); err != nil {
                            return err
                        }
                    } else if err == nil && !exists {
                        if err := c.dataStore.Add(convertedObj); err != nil {
                            return err
                        }
                        if err := c.notifyChannelOnCreate(convertedObj); err != nil {
                            return err
                        }
                    } else {
                        return err
                    }
                case cache.Deleted:
                    if err := c.dataStore.Delete(convertedObj); err != nil {
                        return err
                    }
                    if err := c.notifyChannelOnDelete(convertedObj); err != nil {
                        return err
                    }
                }
            }
            return nil
        },
    }
    c.controller = cache.New(config)

    // Run the controller
    c.controller.Run(stopChanel)
}

// GetDataStore returns the data store when it has successfully synced with API Server
func (c *CustomController) GetDataStore() cache.Indexer {
    // Custom data store, it should not be accessed directly as the cache could be out of sync
    // on startup. Must be accessed from the pod controller's data store instead
    // TODO: we should refactor this in the future, as this approach will make controllers to run without having pod synced.
    // (It thus blocks when pod information is accessed)
    for c.controller == nil || (!c.controller.HasSynced() && c.controller.LastSyncResourceVersion() == "") {
        c.log.Info("waiting for controller to sync")
        time.Sleep(time.Second * 5)
    }
    return c.dataStore
}

// newListWatcher returns a list watcher with a custom list function that converts the
// response for each page using the converter function and returns a general watcher
func newListWatcher(restClient cache.Getter, resource string, namespace string, limit int64,
    converter Converter) *cache.ListWatch {

    listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
        ctx := context.Background()

        list, err := restClient.Get().
            Namespace(namespace).
            Resource(resource).
            // This needs to be done because just setting the limit using option's
            // Limit is being overridden and the response is returned without pagination.
            VersionedParams(&metav1.ListOptions{
                Limit:    limit,
                Continue: options.Continue,
            }, metav1.ParameterCodec).
            Do(ctx).
            Get()

        if err != nil {
            return list, err
        }
        // Strip down the the list before passing the paginated response back to
        // the pager function
        convertedList, err := converter.ConvertList(list)
        return convertedList.(runtime.Object), err
    }

    // We don't need to modify the watcher, we will strip down the k8s object in the ProcessFunc
    // before storing the object in the data store.
    watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
        ctx := context.Background()
        options.Watch = true

        return restClient.Get().
            Namespace(namespace).
            Resource(resource).
            VersionedParams(&options, metav1.ParameterCodec).
            Watch(ctx)
    }
    return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

// notifyChannelOnCreate notifies the add event on the appropriate channel
func (c *CustomController) notifyChannelOnCreate(obj interface{}) error {
    meta, err := apimeta.Accessor(obj)
    if err != nil {
        return err
    }
    c.eventNotificationChan <- GenericEvent{
        EventType: CREATE,
        Meta:      meta,
        Object:    obj.(runtime.Object),
    }
    return nil
}

// notifyChannelOnCreate notifies the add event on the appropriate channel
func (c *CustomController) notifyChannelOnUpdate(oldObj, newObj interface{}) error {
    oldMeta, err := apimeta.Accessor(oldObj)
    if err != nil {
        return err
    }

    newMeta, err := apimeta.Accessor(newObj)
    if err != nil {
        return err
    }

    c.eventNotificationChan <- GenericEvent{
        EventType: UPDATE,
        OldMeta:   oldMeta,
        OldObject: oldObj.(runtime.Object),
        Meta:      newMeta,
        Object:    newObj.(runtime.Object),
    }
    return nil
}

// notifyChannelOnDelete notifies the delete event on the appropriate channel
func (c *CustomController) notifyChannelOnDelete(obj interface{}) error {
    meta, err := apimeta.Accessor(obj)
    if err != nil {
        return err
    }
    c.eventNotificationChan <- GenericEvent{
        EventType: DELETE,
        OldMeta:   meta,
        OldObject: obj.(runtime.Object),
    }
    return nil
}
package pkg

import (
    "context"
    "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    corev1lister "k8s.io/client-go/listers/core/v1"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
    "time"
    "k8s.io/klog"
)

// PodReconciler is an interface to annotate pods with the current timestamp.
type PodReconciler interface {
    // Run starts the reconciler.
    Run(ctx context.Context, workers int)
}

// podReconciler implements the podReconciler interface.
// This implementation listens on pod add and update events.
// It adds the timestamp, as an annotation, to Pods if it
// doesnt already exist.
// If waitForAnnotation is set, this implementation only adds
// the timestamp to Pods that are annotated with "add-timestamp".
type podReconciler struct {
    k8sclient kubernetes.Interface
    podQueue workqueue.RateLimitingInterface
    podLister corev1lister.PodLister
    podSynced cache.InformerSynced
    waitForAnnotation bool
}

// NewPodReconciler initializes and returns an implementation of the PodReconciler interface.
// If the namespaceToWatch parameter is specified, it sets up the reconciler to listen only
// on the given namespace.
func NewPodReconciler(client kubernetes.Interface, namespaceToWatch string, waitForAnnotation bool) (PodReconciler, error) {
    klog.Infof("Initializing Pod annotation controller. Listening to Pods on namespace %s", namespaceToWatch)
    // Create new informer factory. If no namespace is specified, the informer is set up to listen on all namespaces.
    informerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(namespaceToWatch))
    // Set up workqueue for Pod add and update events
    queue := workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second, 5*time.Minute))
    podInformer := informerFactory.Core().V1().Pods()
    pr := &podReconciler{
        k8sclient: client,
        podQueue:  queue,
        podLister: podInformer.Lister(),
        podSynced: podInformer.Informer().HasSynced,
        waitForAnnotation: waitForAnnotation,
    }

    // Set up event handlers for Pod Add and Update events
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    pr.podAdd,
        UpdateFunc: pr.podUpdate,
        DeleteFunc: nil,
    })

    // Channel that can be used to destroy the informer.
    stopCh := make(chan struct{})
    informerFactory.Start(stopCh)
    if !cache.WaitForCacheSync(stopCh, pr.podSynced) {
        return nil, nil
    }
    klog.Infof("Initialized Pod annotation controller.")
    return pr, nil
}
func (pr *podReconciler) podAdd(obj interface{}) {
    objKey, err := getPodKey(obj)
    if err != nil {
        klog.Errorf("failed to get pod key with error %v", err)
        return
    }
    klog.V(4).Infof("Adding Pod %s to workqueue", objKey)
    pr.podQueue.Add(objKey)
}


func (pr *podReconciler) podUpdate(oldObj, newObj interface{}) {
    oldPod, ok := oldObj.(*v1.Pod)
    if !ok || oldPod == nil {
        return
    }

    newPod, ok := newObj.(*v1.Pod)
    if !ok || newPod == nil {
        return
    }

    pr.podAdd(newObj)
}


func (pr *podReconciler) Run(ctx context.Context, workers int) {
    defer pr.podQueue.ShutDown()

    klog.Infof("Setting up Pod Reconciler to run with %s threads", workers)
    stopCh := ctx.Done()

    for i := 0; i < workers; i++ {
        go wait.Until(pr.podReconcileWorker, 0, stopCh)
    }

    <-stopCh
}

// podReconcileWorker gets items from the workqueue and attempts to reconcile them.
// If reconciliation fails, it adds the item back to the workqueue.
func (pr *podReconciler) podReconcileWorker() {
    key, quit := pr.podQueue.Get()
    if quit {
        return
    }
    defer pr.podQueue.Done(key)

    if err := pr.reconcile(key.(string)); err != nil {
        // Put PVC back to the queue so that we can retry later.
        pr.podQueue.AddRateLimited(key)
    } else {
        pr.podQueue.Forget(key)
    }
}

// reconcile adds the current timestamp as an annotation to a Pod and logs the operation to stdout.
func (pr *podReconciler) reconcile (key string) error {
    klog.Infof("Reconciling Pod %s", key)
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        klog.Errorf("failed to split key with error %v", err)
        return err
    }
    pod, err := pr.podLister.Pods(namespace).Get(name)
    if err != nil {
        klog.Errorf("failed to get Pod %s/%s with error %v", namespace, name, err)
        return err
    }
    podAnnotations := pod.GetAnnotations()
    if podAnnotations == nil {
        podAnnotations = make(map[string]string)
    }
    klog.V(4).Infof("Existing annotations on Pod %s/%s: %v", namespace, name, podAnnotations)
    if pr.waitForAnnotation {
        if _, ok := podAnnotations["add-timestamp"]; !ok {
            klog.V(4).Infof("Annotation add-timestamp doesnt exist, igonoring ...")
            return nil
        }
    }
    if _, ok := podAnnotations["timestamp"]; !ok {
        // Add timestamp to Pod
        podAnnotations["timestamp"] = time.Now().String()
        pod.SetAnnotations(podAnnotations)
        _, err = pr.k8sclient.CoreV1().Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
        if err != nil {
            klog.Errorf("failed to update Pod %s/%s with timestamp due to error %v", namespace, name, err)
            return err
        }
        klog.Infof("Added timestamp %v to Pod %s/%s", podAnnotations["timestamp"], namespace, name)
    }
    return nil
}

func getPodKey(obj interface{}) (string, error) {
    if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
        obj = unknown.Obj
    }
    objKey, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
    return objKey, err
}

GetPod

package k8s

import (
    "context"
    "fmt"
    "os"

    "github.com/ericchiang/k8s"
    corev1 "github.com/ericchiang/k8s/apis/core/v1"
)

const (
    podNamespaceEnvVar = "KUBERNETES_POD_NAMESPACE"
    podNameEnvVar      = "KUBERNETES_POD_NAME"
)

// Client contains methods to access k8s API
type Client interface {
    // GetPod returns current pod data.
    GetPod(ctx context.Context) (*corev1.Pod, error)
}

var clientProvider = func() (Client, error) {
    k8sClient, err := k8s.NewInClusterClient()

    return &defaultClient{k8sClient: k8sClient}, err
}

type defaultClient struct {
    k8sClient *k8s.Client
}

// GetPod returns k8s Pod information
func (c *defaultClient) GetPod(ctx context.Context) (*corev1.Pod, error) {
    podNamespace := os.Getenv(podNamespaceEnvVar)
    podName := os.Getenv(podNameEnvVar)

    pod := &corev1.Pod{}
    if err := c.k8sClient.Get(ctx, podNamespace, podName, pod); err != nil {
        return nil, fmt.Errorf("unable to get pod data from API: %s", err)
    }

    return pod, nil
}

GetPod

// GetService returns the definition of a specific service.
// It returns an error on any problem.
func (in *K8SClient) GetService(namespace, serviceName string) (*core_v1.Service, error) {
    return in.k8s.CoreV1().Services(namespace).Get(serviceName, emptyGetOptions)
}

// GetEndpoints return the list of endpoint of a specific service.
// It returns an error on any problem.
func (in *K8SClient) GetEndpoints(namespace, serviceName string) (*core_v1.Endpoints, error) {
    return in.k8s.CoreV1().Endpoints(namespace).Get(serviceName, emptyGetOptions)
}

// GetPods returns the pods definitions for a given set of labels.
// An empty labelSelector will fetch all pods found per a namespace.
// It returns an error on any problem.
func (in *K8SClient) GetPods(namespace, labelSelector string) ([]core_v1.Pod, error) {
    // An empty selector is ambiguous in the go client, could mean either "select all" or "select none"
    // Here we assume empty == select all
    // (see also https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors)
    if pods, err := in.k8s.CoreV1().Pods(namespace).List(meta_v1.ListOptions{LabelSelector: labelSelector}); err == nil {
        return pods.Items, nil
    } else {
        return []core_v1.Pod{}, err
    }
}

// GetPod returns the pod definitions for a given pod name.
// It returns an error on any problem.
func (in *K8SClient) GetPod(namespace, name string) (*core_v1.Pod, error) {
    if pod, err := in.k8s.CoreV1().Pods(namespace).Get(name, emptyGetOptions); err != nil {
        return nil, err
    } else {
        return pod, nil
    }
}

// GetPod returns the pod definitions for a given pod name.
// It returns an error on any problem.
func (in *K8SClient) GetPodLogs(namespace, name string, opts *core_v1.PodLogOptions) (*PodLogs, error) {
    req := in.k8s.CoreV1().RESTClient().Get().Namespace(namespace).Name(name).Resource("pods").SubResource("log").VersionedParams(opts, scheme.ParameterCodec)

    readCloser, err := req.Stream()
    if err != nil {
        return nil, err
    }

    defer readCloser.Close()
    buf := new(bytes.Buffer)
    _, err = buf.ReadFrom(readCloser)
    if err != nil {
        return nil, err
    }

    return &PodLogs{Logs: buf.String()}, nil
}

func (in *K8SClient) GetCronJobs(namespace string) ([]batch_v1beta1.CronJob, error) {
    if cjList, err := in.k8s.BatchV1beta1().CronJobs(namespace).List(emptyListOptions); err == nil {
        return cjList.Items, nil
    } else {
        return []batch_v1beta1.CronJob{}, err
    }
}

func (in *K8SClient) GetJobs(namespace string) ([]batch_v1.Job, error) {
    if jList, err := in.k8s.BatchV1().Jobs(namespace).List(emptyListOptions); err == nil {
        return jList.Items, nil
    } else {
        return []batch_v1.Job{}, err
    }
}
/ GetConfigMap fetches and returns the specified ConfigMap definition
// from the cluster
func (in *K8SClient) GetConfigMap(namespace, configName string) (*core_v1.ConfigMap, error) {
    configMap, err := in.k8s.CoreV1().ConfigMaps(namespace).Get(configName, emptyGetOptions)
    if err != nil {
        return &core_v1.ConfigMap{}, err
    }

    return configMap, nil
}

// GetNamespace fetches and returns the specified namespace definition
// from the cluster
func (in *K8SClient) GetNamespace(namespace string) (*core_v1.Namespace, error) {
    ns, err := in.k8s.CoreV1().Namespaces().Get(namespace, emptyGetOptions)
    if err != nil {
        return &core_v1.Namespace{}, err
    }

    return ns, nil
}

// GetServerVersion fetches and returns information about the version Kubernetes that is running
func (in *K8SClient) GetServerVersion() (*version.Info, error) {
    return in.k8s.Discovery().ServerVersion()
}

// GetNamespaces returns a list of all namespaces of the cluster.
// It returns a list of all namespaces of the cluster.
// It returns an error on any problem.
func (in *K8SClient) GetNamespaces(labelSelector string) ([]core_v1.Namespace, error) {
    var listOptions meta_v1.ListOptions

    // Apply labelSelector filtering if specified
    if labelSelector != "" {
        listOptions = meta_v1.ListOptions{LabelSelector: labelSelector}
    } else {
        listOptions = emptyListOptions
    }

    namespaces, err := in.k8s.CoreV1().Namespaces().List(listOptions)
    if err != nil {
        return nil, err
    }

    return namespaces.Items, nil
}

// GetProject fetches and returns the definition of the project with
// the specified name by querying the cluster API. GetProject will fail
// if the underlying cluster is not Openshift.
func (in *K8SClient) GetProject(name string) (*osproject_v1.Project, error) {
    result := &osproject_v1.Project{}

    err := in.k8s.RESTClient().Get().Prefix("apis", "project.openshift.io", "v1", "projects", name).Do().Into(result)

    if err != nil {
        return nil, err
    }

    return result, nil
}

func (in *K8SClient) GetProjects(labelSelector string) ([]osproject_v1.Project, error) {
    result := &osproject_v1.ProjectList{}

    request := in.k8s.RESTClient().Get().Prefix("apis", "project.openshift.io", "v1", "projects")

    // Apply label selector filtering if specified
    if labelSelector != "" {
        request.Param("labelSelector", labelSelector)
    }

    err := request.Do().Into(result)

    if err != nil {
        return nil, err
    }

    return result.Items, nil
}

func (in *K8SClient) IsOpenShift() bool {
    if in.isOpenShift == nil {
        isOpenShift := false
        _, err := in.k8s.RESTClient().Get().AbsPath("/apis/project.openshift.io").Do().Raw()
        if err == nil {
            isOpenShift = true
        }
        in.isOpenShift = &isOpenShift
    }
    return *in.isOpenShift
}

// GetServices returns a list of services for a given namespace.
// If selectorLabels is defined the list of services is filtered for those that matches Services selector labels.
// It returns an error on any problem.
func (in *K8SClient) GetServices(namespace string, selectorLabels map[string]string) ([]core_v1.Service, error) {
    var allServices []core_v1.Service

    if allServicesList, err := in.k8s.CoreV1().Services(namespace).List(emptyListOptions); err == nil {
        allServices = allServicesList.Items
    } else {
        return []core_v1.Service{}, err
    }

    if selectorLabels == nil {
        return allServices, nil
    }
    var services []core_v1.Service
    for _, svc := range allServices {
        svcSelector := labels.Set(svc.Spec.Selector).AsSelector()
        if !svcSelector.Empty() && svcSelector.Matches(labels.Set(selectorLabels)) {
            services = append(services, svc)
        }
    }
    return services, nil
}

// GetDeployment returns the definition of a specific deployment.
// It returns an error on any problem.
func (in *K8SClient) GetDeployment(namespace, deploymentName string) (*apps_v1.Deployment, error) {
    return in.k8s.AppsV1().Deployments(namespace).Get(deploymentName, emptyGetOptions)
}

// GetRoute returns the external URL endpoint of a specific route name.
// It returns an error on any problem.
func (in *K8SClient) GetRoute(namespace, name string) (*osroutes_v1.Route, error) {
    result := &osroutes_v1.Route{}
    err := in.k8s.RESTClient().Get().Prefix("apis", "route.openshift.io", "v1").Namespace(namespace).Resource("routes").SubResource(name).Do().Into(result)
    if err != nil {
        return nil, err
    }
    return result, nil
}

// GetDeployments returns an array of deployments for a given namespace.
// It returns an error on any problem.
func (in *K8SClient) GetDeployments(namespace string) ([]apps_v1.Deployment, error) {
    if depList, err := in.k8s.AppsV1().Deployments(namespace).List(emptyListOptions); err == nil {
        return depList.Items, nil
    } else {
        return []apps_v1.Deployment{}, err
    }
}

// GetDeployments returns an array of deployments for a given namespace and a set of labels.
// An empty labelSelector will fetch all Deployments for a namespace.
// It returns an error on any problem.
func (in *K8SClient) GetDeploymentsByLabel(namespace string, labelSelector string) ([]apps_v1.Deployment, error) {
    listOptions := meta_v1.ListOptions{LabelSelector: labelSelector}
    if depList, err := in.k8s.AppsV1().Deployments(namespace).List(listOptions); err == nil {
        return depList.Items, nil
    } else {
        return []apps_v1.Deployment{}, err
    }
}

// GetDeployment returns the definition of a specific deployment.
// It returns an error on any problem.
func (in *K8SClient) GetDeploymentConfig(namespace, deploymentconfigName string) (*osapps_v1.DeploymentConfig, error) {
    result := &osapps_v1.DeploymentConfig{}
    err := in.k8s.RESTClient().Get().Prefix("apis", "apps.openshift.io", "v1").Namespace(namespace).Resource("deploymentconfigs").SubResource(deploymentconfigName).Do().Into(result)
    if err != nil {
        return nil, err
    }
    return result, nil
}

// GetDeployments returns an array of deployments for a given namespace.
// An empty labelSelector will fetch all Deployments for a namespace.
// It returns an error on any problem.
func (in *K8SClient) GetDeploymentConfigs(namespace string) ([]osapps_v1.DeploymentConfig, error) {
    result := &osapps_v1.DeploymentConfigList{}
    err := in.k8s.RESTClient().Get().Prefix("apis", "apps.openshift.io", "v1").Namespace(namespace).Resource("deploymentconfigs").Do().Into(result)
    if err != nil {
        return nil, err
    }
    return result.Items, nil
}

func (in *K8SClient) GetReplicaSets(namespace string) ([]apps_v1.ReplicaSet, error) {
    if rsList, err := in.k8s.AppsV1().ReplicaSets(namespace).List(emptyListOptions); err == nil {
        return rsList.Items, nil
    } else {
        return []apps_v1.ReplicaSet{}, err
    }
}

func (in *K8SClient) GetStatefulSet(namespace string, statefulsetName string) (*apps_v1.StatefulSet, error) {
    return in.k8s.AppsV1().StatefulSets(namespace).Get(statefulsetName, emptyGetOptions)
}

func (in *K8SClient) GetStatefulSets(namespace string) ([]apps_v1.StatefulSet, error) {
    if ssList, err := in.k8s.AppsV1().StatefulSets(namespace).List(emptyListOptions); err == nil {
        return ssList.Items, nil
    } else {
        return []apps_v1.StatefulSet{}, err
    }
}

func (in *K8SClient) GetReplicationControllers(namespace string) ([]core_v1.ReplicationController, error) {
    if rcList, err := in.k8s.CoreV1().ReplicationControllers(namespace).List(emptyListOptions); err == nil {
        return rcList.Items, nil
    } else {
        return []core_v1.ReplicationController{}, err
    }
}

// GetService returns the definition of a specific service.
// It returns an error on any problem.
func (in *K8SClient) GetService(namespace, serviceName string) (*core_v1.Service, error) {
    return in.k8s.CoreV1().Services(namespace).Get(serviceName, emptyGetOptions)
}

// GetEndpoints return the list of endpoint of a specific service.
// It returns an error on any problem.
func (in *K8SClient) GetEndpoints(namespace, serviceName string) (*core_v1.Endpoints, error) {
    return in.k8s.CoreV1().Endpoints(namespace).Get(serviceName, emptyGetOptions)
}

// GetPods returns the pods definitions for a given set of labels.
// An empty labelSelector will fetch all pods found per a namespace.
// It returns an error on any problem.
func (in *K8SClient) GetPods(namespace, labelSelector string) ([]core_v1.Pod, error) {
    // An empty selector is ambiguous in the go client, could mean either "select all" or "select none"
    // Here we assume empty == select all
    // (see also https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors)
    if pods, err := in.k8s.CoreV1().Pods(namespace).List(meta_v1.ListOptions{LabelSelector: labelSelector}); err == nil {
        return pods.Items, nil
    } else {
        return []core_v1.Pod{}, err
    }
}

// GetPod returns the pod definitions for a given pod name.
// It returns an error on any problem.
func (in *K8SClient) GetPod(namespace, name string) (*core_v1.Pod, error) {
    if pod, err := in.k8s.CoreV1().Pods(namespace).Get(name, emptyGetOptions); err != nil {
        return nil, err
    } else {
        return pod, nil
    }
}

// GetPod returns the pod definitions for a given pod name.
// It returns an error on any problem.
func (in *K8SClient) GetPodLogs(namespace, name string, opts *core_v1.PodLogOptions) (*PodLogs, error) {
    req := in.k8s.CoreV1().RESTClient().Get().Namespace(namespace).Name(name).Resource("pods").SubResource("log").VersionedParams(opts, scheme.ParameterCodec)

    readCloser, err := req.Stream()
    if err != nil {
        return nil, err
    }

    defer readCloser.Close()
    buf := new(bytes.Buffer)
    _, err = buf.ReadFrom(readCloser)
    if err != nil {
        return nil, err
    }

    return &PodLogs{Logs: buf.String()}, nil
}

func (in *K8SClient) GetCronJobs(namespace string) ([]batch_v1beta1.CronJob, error) {
    if cjList, err := in.k8s.BatchV1beta1().CronJobs(namespace).List(emptyListOptions); err == nil {
        return cjList.Items, nil
    } else {
        return []batch_v1beta1.CronJob{}, err
    }
}

func (in *K8SClient) GetJobs(namespace string) ([]batch_v1.Job, error) {
    if jList, err := in.k8s.BatchV1().Jobs(namespace).List(emptyListOptions); err == nil {
        return jList.Items, nil
    } else {
        return []batch_v1.Job{}, err
    }
}
E0906 10:44:14.807664   13796 reflector.go:127] pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:156: Failed to watch *v1alpha1.Waitdeployment: the server could not find the requested resource (get waitdeployments.qbox.io)
E0906 10:44:15.959997   13796 reflector.go:127] pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:156: Failed to watch *v1alpha1.Waitdeployment: failed to list *v1alpha1.Waitdeployment: the server could not find the requested resource (get waitdeployments.qbox.io)
E0906 10:44:18.241173   13796 reflector.go:127] pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:156: Failed to watch *v1alpha1.Waitdeployment: failed to list *v1alpha1.Waitdeployment: the server could not find the requested resource (get waitdeployments.qbox.io)
E0906 10:44:23.641205   13796 reflector.go:127] pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:156: Failed to watch *v1alpha1.Waitdeployment: failed to list *v1alpha1.Waitdeployment: the server could not find the requested resource (get waitdeployments.qbox.io)
E0906 10:44:30.463055   13796 reflector.go:127] pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:156: Failed to watch *v1alpha1.Waitdeployment: failed to list *v1alpha1.Waitdeployment: the server could not find the requested resource (get waitdeployments.qbox.io)
E0906 10:44:42.829257   13796 controller.go:281] Waitdeployment 'default/wdtest' in work queue no longer exists

processNextWorkItem

E0906 10:44:42.829257 13796 controller.go:281] Waitdeployment 'default/wdtest' in work queue no longer exists
I0906 10:44:42.829321 13796 controller.go:254] Successfully synced 'default/wdtest'

/ processNextWorkItem 从 workqueue 中获取一个任务并最终调用 syncHandler 执行她
func (c *Client) processNextWorkItem() bool {
        obj, shutdown := c.workqueue.Get()
        if shutdown {
                return false
        }

        // 这里写成函数形式是为了方便里面能直接调用 defer
        err := func(obj interface{}) error {
                // 通过调用 Done 方法可以通知 workqueue 完成了这个任务
                defer c.workqueue.Done(obj)
                var key string
                var ok bool
                // We expect strings to come off the workqueue. These are of the
                // form namespace/name. We do this as the delayed nature of the
                // workqueue means the items in the informer cache may actually be
                // more up to date that when the item was initially put onto the
                // workqueue.
                if key, ok = obj.(string); !ok {
                        // 通过调用 Forget 方法可以避免任务被再次入队,比如调用一个任务出错后,为了避免
                        // 它再次放入队列底部并在 back-off 后再次尝试,可以调用这个方法
                        c.workqueue.Forget(obj)
                        utilRuntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
                        return nil
                }
                // Run the syncHandler, passing it the namespace/name string of the
                // Foo resource to be synced.
                if err := c.syncHandler(key); err != nil {
                        // Put the item back on the workqueue to handle any transient errors.
                        c.workqueue.AddRateLimited(key)
                        return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
                }
                // Finally, if no error occurs we Forget this item so it does not
                // get queued again until another change happens.
                c.workqueue.Forget(obj)
                klog.Infof("Successfully synced '%s'", key)
                return nil
        }(obj)

        if err != nil {
                utilRuntime.HandleError(err)
                return true
        }

        return true
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Waitdeployment
// resource with the current status of the resource.
func (c *Client) syncHandler(key string) error {
        // Convert the namespace/name string into a distinct namespace and name
        ns, n, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
                utilRuntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
                return nil
        }
        waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(ns).Get(n)
        if err != nil {
                // The Waitdeployment resource may no longer exist, in which case we stop
                // processing.
                if errors.IsNotFound(err) {
                        utilRuntime.HandleError(fmt.Errorf("Waitdeployment '%s' in work queue no longer exists", key))
                        return nil
                }
                return err
        }

        deployment, err := c.checkAndStartDeployment(waitDeployment)
        if err != nil {
                return err
        }

        // If the Deployment is not controlled by this waitDeployment resource,
        // we should log a warning to the event recorder and return error msg.
        if !metav1.IsControlledBy(deployment, waitDeployment) {
                msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
                c.recorder.Event(waitDeployment, coreV1.EventTypeWarning, ErrResourceExists, msg)
                return fmt.Errorf(msg)
        }

        // update deployment
        deployment, err = c.kubeClient.AppsV1().Deployments(waitDeployment.Namespace).Update(c.ctx, newDeployment(waitDeployment), metav1.UpdateOptions{})
        if err != nil {
                return err
        }

        // update waitDeployment
        _, err = c.customClient.QboxV1alpha1().Waitdeployments(waitDeployment.Namespace).Update(c.ctx, waitDeployment.DeepCopy(), metav1.UpdateOptions{})
        if err != nil {
                return err
        }

        c.recorder.Event(waitDeployment, coreV1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
        return nil
}

kubectl delete -f artifacts/waitdeployment.yaml

customresourcedefinition.apiextensions.k8s.io "waitdeployments.qbox.io" deleted

(dlv) b QboxV1alpha1
Breakpoint 2 (enabled) set at 0xcdc070 for github.com/xwen-winnie/crd_demo/kube/client/clientset/versioned.(*Clientset).QboxV1alpha1() ./kube/client/clientset/versioned/clientset.go:44
(dlv) c
> github.com/xwen-winnie/crd_demo/kube/client/clientset/versioned.(*Clientset).QboxV1alpha1() ./kube/client/clientset/versioned/clientset.go:44 (hits goroutine(94):1 total:1) (PC: 0xcdc070)
Warning: debugging optimized function
    39:         qboxV1alpha1 *qboxv1alpha1.QboxV1alpha1Client
    40: }
    41:
    42: // QboxV1alpha1 retrieves the QboxV1alpha1Client
    43: func (c *Clientset) QboxV1alpha1() qboxv1alpha1.QboxV1alpha1Interface {
=>  44:         return c.qboxV1alpha1
    45: }
    46:
    47: // Discovery retrieves the DiscoveryClient
    48: func (c *Clientset) Discovery() discovery.DiscoveryInterface {
    49:         if c == nil {
(dlv) bt
 0  0x0000000000cdc070 in github.com/xwen-winnie/crd_demo/kube/client/clientset/versioned.(*Clientset).QboxV1alpha1
    at ./kube/client/clientset/versioned/clientset.go:44
 1  0x0000000000cdd8b4 in github.com/xwen-winnie/crd_demo/kube/client/informers/externalversions/qbox/v1alpha1.NewFilteredWaitdeploymentInformer.func2
    at ./kube/client/informers/externalversions/qbox/v1alpha1/waitdeployment.go:71
 2  0x0000000000c114c4 in k8s.io/client-go/tools/cache.(*ListWatch).Watch
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/listwatch.go:111
 3  0x0000000000c134a4 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:402
 4  0x0000000000c1afd4 in k8s.io/client-go/tools/cache.(*Reflector).Run.func1
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:209
 5  0x000000000068f544 in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.19.3/pkg/util/wait/wait.go:155
 6  0x000000000068e6e4 in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.19.3/pkg/util/wait/wait.go:156
 7  0x0000000000c12e8c in k8s.io/client-go/tools/cache.(*Reflector).Run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:208
 8  0x0000000000c1ddd0 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.19.0/tools/cache/reflector.go:206
 9  0x000000000068f3e0 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.19.3/pkg/util/wait/wait.go:56
10  0x000000000068f4a4 in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.19.3/pkg/util/wait/wait.go:73
11  0x000000000006fb84 in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) 

checkAndStartDeployment

kubectl create -f  artifacts/waitdeployment_test.yaml
waitdeployment.qbox.io/wdtest created
(dlv) c
> github.com/xwen-winnie/crd_demo/kube.doTCPProbe() ./kube/controller.go:333 (hits goroutine(146):1 total:2) (PC: 0xce8c40)
Warning: debugging optimized function
   328:         }
   329:         return deployment, nil
   330: }
   331:
   332: // tcp 连接检测代码
=> 333: func doTCPProbe(addr string, timeout time.Duration) error {
   334:         conn, err := net.DialTimeout("tcp", addr, timeout)
   335:         if err != nil {
   336:                 return err
   337:         }
   338:         err = conn.Close()
(dlv) bt
 0  0x0000000000ce8c40 in github.com/xwen-winnie/crd_demo/kube.doTCPProbe
    at ./kube/controller.go:333
 1  0x0000000000ce899c in github.com/xwen-winnie/crd_demo/kube.(*Client).checkAndStartDeployment
    at ./kube/controller.go:317
 2  0x0000000000ce8520 in github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler
    at ./kube/controller.go:287
 3  0x0000000000ce9a88 in github.com/xwen-winnie/crd_demo/kube.(*Client).processNextWorkItem.func1
    at ./kube/controller.go:246
 4  0x0000000000ce8318 in github.com/xwen-winnie/crd_demo/kube.(*Client).processNextWorkItem
    at ./kube/controller.go:256
 5  0x0000000000ce82a8 in github.com/xwen-winnie/crd_demo/kube.(*Client).runWorker
    at ./kube/controller.go:215
 6  0x0000000000ce9e28 in github.com/xwen-winnie/crd_demo/kube.(*Client).runWorker-fm
    at ./kube/controller.go:214
 7  0x000000000068f544 in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.19.3/pkg/util/wait/wait.go:155
 8  0x000000000068e6e4 in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.19.3/pkg/util/wait/wait.go:156
 9  0x000000000068e658 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.19.3/pkg/util/wait/wait.go:133
10  0x000000000068e5b8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.19.3/pkg/util/wait/wait.go:90
11  0x000000000006fb84 in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) p addr
"192.168.1.5:30080"
(dlv) 
func (c *Client) syncHandler(key string) error {
        // Convert the namespace/name string into a distinct namespace and name
        ns, n, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
                utilRuntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
                return nil
        }
        waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(ns).Get(n)
        if err != nil {
                // The Waitdeployment resource may no longer exist, in which case we stop
                // processing.
                if errors.IsNotFound(err) {
                        utilRuntime.HandleError(fmt.Errorf("Waitdeployment '%s' in work queue no longer exists", key))
                        return nil
                }
                return err
        }

        deployment, err := c.checkAndStartDeployment(waitDeployment)
        if err != nil {
                return err
        }
(dlv) p waitDeployment
*github.com/xwen-winnie/crd_demo/kube/apis/qbox/v1alpha1.Waitdeployment {
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""},
        ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
                Name: "wdtest",
                GenerateName: "",
                Namespace: "default",
                SelfLink: "/apis/qbox.io/v1alpha1/namespaces/default/waitdeployments/wdtest",
                UID: "1b1ea378-360f-405e-b374-a42ba7267a75",
                ResourceVersion: "16289989",
                Generation: 1,
                CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x40001e7988),
                DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
                DeletionGracePeriodSeconds: *int64 nil,
                Labels: map[string]string [...],
                Annotations: map[string]string nil,
                OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil,
                Finalizers: []string len: 0, cap: 0, nil,
                ClusterName: "",
                ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40003a85a0),
                ],},
        Spec: k8s.io/api/apps/v1.DeploymentSpec {
                Replicas: *1,
                Selector: *(*"k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector")(0x40001aad80),
                Template: (*"k8s.io/api/core/v1.PodTemplateSpec")(0x40001e7a28),
                Strategy: (*"k8s.io/api/apps/v1.DeploymentStrategy")(0x40001e7d00),
                MinReadySeconds: 0,
                RevisionHistoryLimit: *int32 nil,
                Paused: false,
                ProgressDeadlineSeconds: *int32 nil,},
        Status: k8s.io/api/apps/v1.DeploymentStatus {
                ObservedGeneration: 0,
                Replicas: 0,
                UpdatedReplicas: 0,
                ReadyReplicas: 0,
                AvailableReplicas: 0,
                UnavailableReplicas: 0,
                Conditions: []k8s.io/api/apps/v1.DeploymentCondition len: 0, cap: 0, nil,
                CollisionCount: *int32 nil,},
        WaitProbe: github.com/xwen-winnie/crd_demo/kube/apis/qbox/v1alpha1.WaitProbe {
                Address: "192.168.1.5:30080",
                Timeout: k8s.io/klog/v2.flushInterval (5000000000),},}
(dlv) 

type Waitdeployment struct

root@ubuntu:~/crd_demo# cat  kube/apis/qbox/v1alpha1/types.go 
package v1alpha1

import (
        "time"

        appv1 "k8s.io/api/apps/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:openapi-gen=true
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// WaitDeployment customize deployment resource definition
type Waitdeployment struct {
        metav1.TypeMeta   `json:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty"`
        Spec              appv1.DeploymentSpec   `json:"spec,omitempty"`
        Status            appv1.DeploymentStatus `json:"status,omitempty"`
        WaitProbe         WaitProbe              `json:"waitProbe,omitempty"`
}

type WaitProbe struct {
        Address string        `json:"address,omitempty"`
        Timeout time.Duration `json:"timeout,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type WaitdeploymentList struct {
        metav1.TypeMeta `json:",inline"`
        metav1.ListMeta `json:"metadata,omitempty"`

        Items []Waitdeployment `json:"items"`
}
root@ubuntu:~/crd_demo# cat   artifacts/waitdeployment_test.yaml
apiVersion: qbox.io/v1alpha1
kind: Waitdeployment
waitProbe:
  address: 192.168.1.5:30080
  timeout: 5000000000
metadata:
  name: wdtest
  labels:
    app: wdtest
spec:
  replicas: 1
  selector:
    matchLabels:
      app: wdtest
  template:
    metadata:
      labels:
        app: wdtest
    spec:
      containers:
        - name: nginx
          image: nginx:latest
          ports:
            - containerPort: 80
root@ubuntu:~/crd_demo# kubectl create -f  artifacts/waitdeployment_test.yaml
waitdeployment.qbox.io/wdtest created
root@ubuntu:~/crd_demo# kubectl get pods
NAME                            READY   STATUS    RESTARTS   AGE
apache-app-84f76964b5-fgsc7     1/1     Running   3          27d
apache-app-84f76964b5-kt5cx     1/1     Running   1          31d
coffee-5f56ff9788-plfcq         1/1     Running   1          10d
coffee-5f56ff9788-zs2f7         1/1     Running   0          10d
example-foo-54dc4db9fc-fmsqn    1/1     Running   3          27d
igh-agent-67d94498c6-dwtsg      1/1     Running   0          2d19h
nginx-app-56b5bb67cc-mkfct      1/1     Running   3          27d
nginx-app-56b5bb67cc-s9jtk      1/1     Running   1          31d
nginx-karmada-f89759699-qcztn   1/1     Running   0          12d
nginx-karmada-f89759699-vn47h   1/1     Running   0          12d
tea-69c99ff568-hdcbl            1/1     Running   0          10d
tea-69c99ff568-p59d6            1/1     Running   0          10d
tea-69c99ff568-tm9q6            1/1     Running   0          10d
wdtest-b9488c899-bw4sm          1/1     Running   0          51s
web2-7cdf5dffb-26xrn            1/1     Running   3          32d
web3-c9654466d-xwb5j            1/1     Running   3          32d
root@ubuntu:~/crd_demo# kubectl get pods | grep wdtest
wdtest-b9488c899-bw4sm          1/1     Running   0          69s
root@ubuntu:~/crd_demo# kubectl delete  -f  artifacts/waitdeployment_test.yaml
waitdeployment.qbox.io "wdtest" deleted
root@ubuntu:~/crd_demo# cat   artifacts/waitdeployment_test.yaml
apiVersion: qbox.io/v1alpha1
kind: Waitdeployment
waitProbe:
  address: 10.111.63.105:80
  timeout: 5000000000
metadata:
  name: wdtest
  labels:
    app: wdtest
spec:
  replicas: 1
  selector:
    matchLabels:
      app: wdtest
  template:
    metadata:
      labels:
        app: wdtest
    spec:
      containers:
        - name: nginx
          image: nginx:latest
          ports:
            - containerPort: 80
root@ubuntu:~/crd_demo# kubectl get svc
NAME         TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)    AGE
apache-svc   ClusterIP   10.111.63.105    <none>        80/TCP     31d
coffee-svc   ClusterIP   10.101.87.73     <none>        80/TCP     10d
kubernetes   ClusterIP   10.96.0.1        <none>        443/TCP    66d
nginx-svc    ClusterIP   10.103.182.145   <none>        80/TCP     31d
tea-svc      ClusterIP   10.103.138.254   <none>        80/TCP     10d
web2         ClusterIP   10.99.87.66      <none>        8097/TCP   31d
web3         ClusterIP   10.107.70.171    <none>        8097/TCP   31d
root@ubuntu:~/crd_demo# 
root@ubuntu:~/crd_demo# dlv attach 13796
Type 'help' for list of commands.
(dlv) b syncHandler
Breakpoint 1 (enabled) set at 0xce8390 for github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:269
(dlv) c
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:269 (hits goroutine(146):1 total:1) (PC: 0xce8390)
Warning: debugging optimized function
   264: }
   265:
   266: // syncHandler compares the actual state with the desired, and attempts to
   267: // converge the two. It then updates the Status block of the Waitdeployment
   268: // resource with the current status of the resource.
=> 269: func (c *Client) syncHandler(key string) error {
   270:         // Convert the namespace/name string into a distinct namespace and name
   271:         ns, n, err := cache.SplitMetaNamespaceKey(key)
   272:         if err != nil {
   273:                 utilRuntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
   274:                 return nil
(dlv) s
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:271 (PC: 0xce83a4)
Warning: debugging optimized function
   266: // syncHandler compares the actual state with the desired, and attempts to
   267: // converge the two. It then updates the Status block of the Waitdeployment
   268: // resource with the current status of the resource.
   269: func (c *Client) syncHandler(key string) error {
   270:         // Convert the namespace/name string into a distinct namespace and name
=> 271:         ns, n, err := cache.SplitMetaNamespaceKey(key)
   272:         if err != nil {
   273:                 utilRuntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
   274:                 return nil
   275:         }
   276:         waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(ns).Get(n)
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:272 (PC: 0xce83cc)
Warning: debugging optimized function
   267: // converge the two. It then updates the Status block of the Waitdeployment
   268: // resource with the current status of the resource.
   269: func (c *Client) syncHandler(key string) error {
   270:         // Convert the namespace/name string into a distinct namespace and name
   271:         ns, n, err := cache.SplitMetaNamespaceKey(key)
=> 272:         if err != nil {
   273:                 utilRuntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
   274:                 return nil
   275:         }
   276:         waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(ns).Get(n)
   277:         if err != nil {
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:276 (PC: 0xce83d8)
Warning: debugging optimized function
   271:         ns, n, err := cache.SplitMetaNamespaceKey(key)
   272:         if err != nil {
   273:                 utilRuntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
   274:                 return nil
   275:         }
=> 276:         waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(ns).Get(n)
   277:         if err != nil {
   278:                 // The Waitdeployment resource may no longer exist, in which case we stop
   279:                 // processing.
   280:                 if errors.IsNotFound(err) {
   281:                         utilRuntime.HandleError(fmt.Errorf("Waitdeployment '%s' in work queue no longer exists", key))
(dlv) p ns
"default"
(dlv) p n
"wdtest"
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:277 (PC: 0xce8428)
Warning: debugging optimized function
   272:         if err != nil {
   273:                 utilRuntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
   274:                 return nil
   275:         }
   276:         waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(ns).Get(n)
=> 277:         if err != nil {
   278:                 // The Waitdeployment resource may no longer exist, in which case we stop
   279:                 // processing.
   280:                 if errors.IsNotFound(err) {
   281:                         utilRuntime.HandleError(fmt.Errorf("Waitdeployment '%s' in work queue no longer exists", key))
   282:                         return nil
(dlv) p waitDeployment
*github.com/xwen-winnie/crd_demo/kube/apis/qbox/v1alpha1.Waitdeployment {
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""},
        ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
                Name: "wdtest",
                GenerateName: "",
                Namespace: "default",
                SelfLink: "/apis/qbox.io/v1alpha1/namespaces/default/waitdeployments/wdtest",
                UID: "c9a7382f-e51e-4184-afcd-981510ee3238",
                ResourceVersion: "16294085",
                Generation: 1,
                CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x400021af88),
                DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
                DeletionGracePeriodSeconds: *int64 nil,
                Labels: map[string]string [...],
                Annotations: map[string]string nil,
                OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil,
                Finalizers: []string len: 0, cap: 0, nil,
                ClusterName: "",
                ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40001901e0),
                ],},
        Spec: k8s.io/api/apps/v1.DeploymentSpec {
                Replicas: *1,
                Selector: *(*"k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector")(0x40007c2160),
                Template: (*"k8s.io/api/core/v1.PodTemplateSpec")(0x400021b028),
                Strategy: (*"k8s.io/api/apps/v1.DeploymentStrategy")(0x400021b300),
                MinReadySeconds: 0,
                RevisionHistoryLimit: *int32 nil,
                Paused: false,
                ProgressDeadlineSeconds: *int32 nil,},
        Status: k8s.io/api/apps/v1.DeploymentStatus {
                ObservedGeneration: 0,
                Replicas: 0,
                UpdatedReplicas: 0,
                ReadyReplicas: 0,
                AvailableReplicas: 0,
                UnavailableReplicas: 0,
                Conditions: []k8s.io/api/apps/v1.DeploymentCondition len: 0, cap: 0, nil,
                CollisionCount: *int32 nil,},
        WaitProbe: github.com/xwen-winnie/crd_demo/kube/apis/qbox/v1alpha1.WaitProbe {
                Address: "10.111.63.105:80",
                Timeout: k8s.io/klog/v2.flushInterval (5000000000),},}
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:287 (PC: 0xce8510)
Warning: debugging optimized function
   282:                         return nil
   283:                 }
   284:                 return err
   285:         }
   286:
=> 287:         deployment, err := c.checkAndStartDeployment(waitDeployment)
   288:         if err != nil {
   289:                 return err
   290:         }
   291:
   292:         // If the Deployment is not controlled by this waitDeployment resource,
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:288 (PC: 0xce852c)
Warning: debugging optimized function
   283:                 }
   284:                 return err
   285:         }
   286:
   287:         deployment, err := c.checkAndStartDeployment(waitDeployment)
=> 288:         if err != nil {
   289:                 return err
   290:         }
   291:
   292:         // If the Deployment is not controlled by this waitDeployment resource,
   293:         // we should log a warning to the event recorder and return error msg.
(dlv) p  deployment
*k8s.io/api/apps/v1.Deployment {
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""},
        ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
                Name: "wdtest",
                GenerateName: "",
                Namespace: "default",
                SelfLink: "/apis/apps/v1/namespaces/default/deployments/wdtest",
                UID: "afffa50a-79c6-4d54-b8b1-f55a4a947777",
                ResourceVersion: "16294277",
                Generation: 1,
                CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x4000646508),
                DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
                DeletionGracePeriodSeconds: *int64 nil,
                Labels: map[string]string [...],
                Annotations: map[string]string nil,
                OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference")(0x4000250050),
                ],
                Finalizers: []string len: 0, cap: 0, nil,
                ClusterName: "",
                ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40002500a0),
                ],},
        Spec: k8s.io/api/apps/v1.DeploymentSpec {
                Replicas: *1,
                Selector: *(*"k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector")(0x40003a4400),
                Template: (*"k8s.io/api/core/v1.PodTemplateSpec")(0x40006465a8),
                Strategy: (*"k8s.io/api/apps/v1.DeploymentStrategy")(0x4000646880),
                MinReadySeconds: 0,
                RevisionHistoryLimit: *10,
                Paused: false,
                ProgressDeadlineSeconds: *600,},
        Status: k8s.io/api/apps/v1.DeploymentStatus {
                ObservedGeneration: 0,
                Replicas: 0,
                UpdatedReplicas: 0,
                ReadyReplicas: 0,
                AvailableReplicas: 0,
                UnavailableReplicas: 0,
                Conditions: []k8s.io/api/apps/v1.DeploymentCondition len: 0, cap: 0, nil,
                CollisionCount: *int32 nil,},}
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:294 (PC: 0xce8534)
Warning: debugging optimized function
   289:                 return err
   290:         }
   291:
   292:         // If the Deployment is not controlled by this waitDeployment resource,
   293:         // we should log a warning to the event recorder and return error msg.
=> 294:         if !metav1.IsControlledBy(deployment, waitDeployment) {
   295:                 msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
   296:                 c.recorder.Event(waitDeployment, coreV1.EventTypeWarning, ErrResourceExists, msg)
   297:                 return fmt.Errorf(msg)
   298:         }
   299:
(dlv) nn
Command failed: command not available
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:301 (PC: 0xce8564)
Warning: debugging optimized function
   296:                 c.recorder.Event(waitDeployment, coreV1.EventTypeWarning, ErrResourceExists, msg)
   297:                 return fmt.Errorf(msg)
   298:         }
   299:
   300:         // update deployment
=> 301:         deployment, err = c.kubeClient.AppsV1().Deployments(waitDeployment.Namespace).Update(c.ctx, newDeployment(waitDeployment), metav1.UpdateOptions{})
   302:         if err != nil {
   303:                 return err
   304:         }
   305:
   306:         // update waitDeployment
(dlv) p waitDeployment
*github.com/xwen-winnie/crd_demo/kube/apis/qbox/v1alpha1.Waitdeployment {
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""},
        ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
                Name: "wdtest",
                GenerateName: "",
                Namespace: "default",
                SelfLink: "/apis/qbox.io/v1alpha1/namespaces/default/waitdeployments/wdtest",
                UID: "c9a7382f-e51e-4184-afcd-981510ee3238",
                ResourceVersion: "16294085",
                Generation: 1,
                CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x400021af88),
                DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
                DeletionGracePeriodSeconds: *int64 nil,
                Labels: map[string]string [...],
                Annotations: map[string]string nil,
                OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil,
                Finalizers: []string len: 0, cap: 0, nil,
                ClusterName: "",
                ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40001901e0),
                ],},
        Spec: k8s.io/api/apps/v1.DeploymentSpec {
                Replicas: *1,
                Selector: *(*"k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector")(0x40007c2160),
                Template: (*"k8s.io/api/core/v1.PodTemplateSpec")(0x400021b028),
                Strategy: (*"k8s.io/api/apps/v1.DeploymentStrategy")(0x400021b300),
                MinReadySeconds: 0,
                RevisionHistoryLimit: *int32 nil,
                Paused: false,
                ProgressDeadlineSeconds: *int32 nil,},
        Status: k8s.io/api/apps/v1.DeploymentStatus {
                ObservedGeneration: 0,
                Replicas: 0,
                UpdatedReplicas: 0,
                ReadyReplicas: 0,
                AvailableReplicas: 0,
                UnavailableReplicas: 0,
                Conditions: []k8s.io/api/apps/v1.DeploymentCondition len: 0, cap: 0, nil,
                CollisionCount: *int32 nil,},
        WaitProbe: github.com/xwen-winnie/crd_demo/kube/apis/qbox/v1alpha1.WaitProbe {
                Address: "10.111.63.105:80",
                Timeout: k8s.io/klog/v2.flushInterval (5000000000),},}
(dlv) p  c.kubeClient
k8s.io/client-go/kubernetes.Interface(*k8s.io/client-go/kubernetes.Clientset) *{
        DiscoveryClient: *k8s.io/client-go/discovery.DiscoveryClient {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,
                LegacyPrefix: "/api",},
        admissionregistrationV1: *k8s.io/client-go/kubernetes/typed/admissionregistration/v1.AdmissionregistrationV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        admissionregistrationV1beta1: *k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1.AdmissionregistrationV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        appsV1: *k8s.io/client-go/kubernetes/typed/apps/v1.AppsV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        appsV1beta1: *k8s.io/client-go/kubernetes/typed/apps/v1beta1.AppsV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        appsV1beta2: *k8s.io/client-go/kubernetes/typed/apps/v1beta2.AppsV1beta2Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        authenticationV1: *k8s.io/client-go/kubernetes/typed/authentication/v1.AuthenticationV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        authenticationV1beta1: *k8s.io/client-go/kubernetes/typed/authentication/v1beta1.AuthenticationV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        authorizationV1: *k8s.io/client-go/kubernetes/typed/authorization/v1.AuthorizationV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        authorizationV1beta1: *k8s.io/client-go/kubernetes/typed/authorization/v1beta1.AuthorizationV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        autoscalingV1: *k8s.io/client-go/kubernetes/typed/autoscaling/v1.AutoscalingV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        autoscalingV2beta1: *k8s.io/client-go/kubernetes/typed/autoscaling/v2beta1.AutoscalingV2beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        autoscalingV2beta2: *k8s.io/client-go/kubernetes/typed/autoscaling/v2beta2.AutoscalingV2beta2Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        batchV1: *k8s.io/client-go/kubernetes/typed/batch/v1.BatchV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        batchV1beta1: *k8s.io/client-go/kubernetes/typed/batch/v1beta1.BatchV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        batchV2alpha1: *k8s.io/client-go/kubernetes/typed/batch/v2alpha1.BatchV2alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        certificatesV1: *k8s.io/client-go/kubernetes/typed/certificates/v1.CertificatesV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        certificatesV1beta1: *k8s.io/client-go/kubernetes/typed/certificates/v1beta1.CertificatesV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        coordinationV1beta1: *k8s.io/client-go/kubernetes/typed/coordination/v1beta1.CoordinationV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        coordinationV1: *k8s.io/client-go/kubernetes/typed/coordination/v1.CoordinationV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        coreV1: *k8s.io/client-go/kubernetes/typed/core/v1.CoreV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        discoveryV1alpha1: *k8s.io/client-go/kubernetes/typed/discovery/v1alpha1.DiscoveryV1alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        discoveryV1beta1: *k8s.io/client-go/kubernetes/typed/discovery/v1beta1.DiscoveryV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        eventsV1: *k8s.io/client-go/kubernetes/typed/events/v1.EventsV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        eventsV1beta1: *k8s.io/client-go/kubernetes/typed/events/v1beta1.EventsV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        extensionsV1beta1: *k8s.io/client-go/kubernetes/typed/extensions/v1beta1.ExtensionsV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        flowcontrolV1alpha1: *k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1.FlowcontrolV1alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        networkingV1: *k8s.io/client-go/kubernetes/typed/networking/v1.NetworkingV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        networkingV1beta1: *k8s.io/client-go/kubernetes/typed/networking/v1beta1.NetworkingV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        nodeV1alpha1: *k8s.io/client-go/kubernetes/typed/node/v1alpha1.NodeV1alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        nodeV1beta1: *k8s.io/client-go/kubernetes/typed/node/v1beta1.NodeV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        policyV1beta1: *k8s.io/client-go/kubernetes/typed/policy/v1beta1.PolicyV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        rbacV1: *k8s.io/client-go/kubernetes/typed/rbac/v1.RbacV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        rbacV1beta1: *k8s.io/client-go/kubernetes/typed/rbac/v1beta1.RbacV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        rbacV1alpha1: *k8s.io/client-go/kubernetes/typed/rbac/v1alpha1.RbacV1alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        schedulingV1alpha1: *k8s.io/client-go/kubernetes/typed/scheduling/v1alpha1.SchedulingV1alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        schedulingV1beta1: *k8s.io/client-go/kubernetes/typed/scheduling/v1beta1.SchedulingV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        schedulingV1: *k8s.io/client-go/kubernetes/typed/scheduling/v1.SchedulingV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        settingsV1alpha1: *k8s.io/client-go/kubernetes/typed/settings/v1alpha1.SettingsV1alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        storageV1beta1: *k8s.io/client-go/kubernetes/typed/storage/v1beta1.StorageV1beta1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        storageV1: *k8s.io/client-go/kubernetes/typed/storage/v1.StorageV1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},
        storageV1alpha1: *k8s.io/client-go/kubernetes/typed/storage/v1alpha1.StorageV1alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},}
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:302 (PC: 0xce8628)
Warning: debugging optimized function
   297:                 return fmt.Errorf(msg)
   298:         }
   299:
   300:         // update deployment
   301:         deployment, err = c.kubeClient.AppsV1().Deployments(waitDeployment.Namespace).Update(c.ctx, newDeployment(waitDeployment), metav1.UpdateOptions{})
=> 302:         if err != nil {
   303:                 return err
   304:         }
   305:
   306:         // update waitDeployment
   307:         _, err = c.customClient.QboxV1alpha1().Waitdeployments(waitDeployment.Namespace).Update(c.ctx, waitDeployment.DeepCopy(), metav1.UpdateOptions{})
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:307 (PC: 0xce862c)
Warning: debugging optimized function
   302:         if err != nil {
   303:                 return err
   304:         }
   305:
   306:         // update waitDeployment
=> 307:         _, err = c.customClient.QboxV1alpha1().Waitdeployments(waitDeployment.Namespace).Update(c.ctx, waitDeployment.DeepCopy(), metav1.UpdateOptions{})
   308:         if err != nil {
   309:                 return err
   310:         }
   311:
   312:         c.recorder.Event(waitDeployment, coreV1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
(dlv) p c.customClient
github.com/xwen-winnie/crd_demo/kube/client/clientset/versioned.Interface(*github.com/xwen-winnie/crd_demo/kube/client/clientset/versioned.Clientset) *{
        DiscoveryClient: *k8s.io/client-go/discovery.DiscoveryClient {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,
                LegacyPrefix: "/api",},
        qboxV1alpha1: *github.com/xwen-winnie/crd_demo/kube/client/clientset/versioned/typed/qbox/v1alpha1.QboxV1alpha1Client {
                restClient: k8s.io/client-go/rest.Interface(*k8s.io/client-go/rest.RESTClient) ...,},}
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:308 (PC: 0xce8708)
Warning: debugging optimized function
   303:                 return err
   304:         }
   305:
   306:         // update waitDeployment
   307:         _, err = c.customClient.QboxV1alpha1().Waitdeployments(waitDeployment.Namespace).Update(c.ctx, waitDeployment.DeepCopy(), metav1.UpdateOptions{})
=> 308:         if err != nil {
   309:                 return err
   310:         }
   311:
   312:         c.recorder.Event(waitDeployment, coreV1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
   313:         return nil
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:312 (PC: 0xce8724)
Warning: debugging optimized function
   307:         _, err = c.customClient.QboxV1alpha1().Waitdeployments(waitDeployment.Namespace).Update(c.ctx, waitDeployment.DeepCopy(), metav1.UpdateOptions{})
   308:         if err != nil {
   309:                 return err
   310:         }
   311:
=> 312:         c.recorder.Event(waitDeployment, coreV1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
   313:         return nil
   314: }
   315:
   316: func (c *Client) checkAndStartDeployment(waitDeployment *v1alpha1.Waitdeployment) (*appsV1.Deployment, error) {
   317:         err := doTCPProbe(waitDeployment.WaitProbe.Address, waitDeployment.WaitProbe.Timeout)
(dlv) p SuccessSynced

handleObject

root@ubuntu:~/crd_demo# dlv attach 13796
Type 'help' for list of commands.
(dlv) b handleObject
Breakpoint 1 (enabled) set at 0xce9270 for github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:372
(dlv) c
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:372 (hits goroutine(45):1 total:1) (PC: 0xce9270)
Warning: debugging optimized function
   367: // have an appropriate OwnerReference, it will simply be skipped.
   368: //
   369: // 将任何实现 metav1.Object 的资源并尝试找到“拥有”它的 Waitdeployment 资源。
   370: // 它通过查看对象 metadata.ownerReferences 字段以获取适当的 OwnerReference 来完成此操作。
   371: // 然后将要处理的 Waitdeployment 资源加入队列。 如果对象没有合适的 OwnerReference,它将被简单地跳过。
=> 372: func (c *Client) handleObject(obj interface{}) {
   373:         object, ok := obj.(metav1.Object)
   374:         if !ok {
   375:                 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   376:                 if !ok {
   377:                         utilRuntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
(dlv) s
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:373 (PC: 0xce9284)
Warning: debugging optimized function
   368: //
   369: // 将任何实现 metav1.Object 的资源并尝试找到“拥有”它的 Waitdeployment 资源。
   370: // 它通过查看对象 metadata.ownerReferences 字段以获取适当的 OwnerReference 来完成此操作。
   371: // 然后将要处理的 Waitdeployment 资源加入队列。 如果对象没有合适的 OwnerReference,它将被简单地跳过。
   372: func (c *Client) handleObject(obj interface{}) {
=> 373:         object, ok := obj.(metav1.Object)
   374:         if !ok {
   375:                 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   376:                 if !ok {
   377:                         utilRuntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
   378:                         return
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:374 (PC: 0xce92b0)
Warning: debugging optimized function
   369: // 将任何实现 metav1.Object 的资源并尝试找到“拥有”它的 Waitdeployment 资源。
   370: // 它通过查看对象 metadata.ownerReferences 字段以获取适当的 OwnerReference 来完成此操作。
   371: // 然后将要处理的 Waitdeployment 资源加入队列。 如果对象没有合适的 OwnerReference,它将被简单地跳过。
   372: func (c *Client) handleObject(obj interface{}) {
   373:         object, ok := obj.(metav1.Object)
=> 374:         if !ok {
   375:                 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   376:                 if !ok {
   377:                         utilRuntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
   378:                         return
   379:                 }
(dlv) p obj
interface {}(*k8s.io/api/apps/v1.Deployment) *{
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""},
        ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
                Name: "wdtest",
                GenerateName: "",
                Namespace: "default",
                SelfLink: "/apis/apps/v1/namespaces/default/deployments/wdtest",
                UID: "d5b338a4-a4a1-4cba-8984-45bc40973d86",
                ResourceVersion: "16296642",
                Generation: 1,
                CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x4000b0e988),
                DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
                DeletionGracePeriodSeconds: *int64 nil,
                Labels: map[string]string [...],
                Annotations: map[string]string nil,
                OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference")(0x4000dbe690),
                ],
                Finalizers: []string len: 0, cap: 0, nil,
                ClusterName: "",
                ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x4000dbe6e0),
                ],},
        Spec: k8s.io/api/apps/v1.DeploymentSpec {
                Replicas: *1,
                Selector: *(*"k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector")(0x40008c0500),
                Template: (*"k8s.io/api/core/v1.PodTemplateSpec")(0x4000b0ea28),
                Strategy: (*"k8s.io/api/apps/v1.DeploymentStrategy")(0x4000b0ed00),
                MinReadySeconds: 0,
                RevisionHistoryLimit: *10,
                Paused: false,
                ProgressDeadlineSeconds: *600,},
        Status: k8s.io/api/apps/v1.DeploymentStatus {
                ObservedGeneration: 0,
                Replicas: 0,
                UpdatedReplicas: 0,
                ReadyReplicas: 0,
                AvailableReplicas: 0,
                UnavailableReplicas: 0,
                Conditions: []k8s.io/api/apps/v1.DeploymentCondition len: 0, cap: 0, nil,
                CollisionCount: *int32 nil,},}
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:388 (PC: 0xce92b4)
Warning: debugging optimized function
   383:                         return
   384:                 }
   385:                 klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
   386:         }
   387:
=> 388:         klog.V(4).Infof("Processing object: %s", object.GetName())
   389:         if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
   390:                 // If this object is not owned by a Foo, we should not do anything more with it.
   391:                 if ownerRef.Kind != WaitdeploymentKind {
   392:                         return
   393:                 }
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:389 (PC: 0xce936c)
Warning: debugging optimized function
   384:                 }
   385:                 klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
   386:         }
   387:
   388:         klog.V(4).Infof("Processing object: %s", object.GetName())
=> 389:         if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
   390:                 // If this object is not owned by a Foo, we should not do anything more with it.
   391:                 if ownerRef.Kind != WaitdeploymentKind {
   392:                         return
   393:                 }
   394:                 waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(object.GetNamespace()).Get(ownerRef.Name)
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:391 (PC: 0xce9370)
Warning: debugging optimized function
   386:         }
   387:
   388:         klog.V(4).Infof("Processing object: %s", object.GetName())
   389:         if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
   390:                 // If this object is not owned by a Foo, we should not do anything more with it.
=> 391:                 if ownerRef.Kind != WaitdeploymentKind {
   392:                         return
   393:                 }
   394:                 waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(object.GetNamespace()).Get(ownerRef.Name)
   395:                 if err != nil {
   396:                         klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
(dlv) p  ownerRef
*k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference {
        APIVersion: "qbox.io/v1alpha1",
        Kind: "Waitdeployment",
        Name: "wdtest",
        UID: "a09005c9-b27c-48f1-a1a0-0358ee14c659",
        Controller: *true,
        BlockOwnerDeletion: *true,}
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:394 (PC: 0xce93d4)
Warning: debugging optimized function
   389:         if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
   390:                 // If this object is not owned by a Foo, we should not do anything more with it.
   391:                 if ownerRef.Kind != WaitdeploymentKind {
   392:                         return
   393:                 }
=> 394:                 waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(object.GetNamespace()).Get(ownerRef.Name)
   395:                 if err != nil {
   396:                         klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
   397:                         return
   398:                 }
   399:                 c.enqueueWaitdeployment(waitDeployment)
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:395 (PC: 0xce9430)
Warning: debugging optimized function
   390:                 // If this object is not owned by a Foo, we should not do anything more with it.
   391:                 if ownerRef.Kind != WaitdeploymentKind {
   392:                         return
   393:                 }
   394:                 waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(object.GetNamespace()).Get(ownerRef.Name)
=> 395:                 if err != nil {
   396:                         klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
   397:                         return
   398:                 }
   399:                 c.enqueueWaitdeployment(waitDeployment)
   400:                 return
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:399 (PC: 0xce9510)
Warning: debugging optimized function
   394:                 waitDeployment, err := c.waitDeploymentsLister.Waitdeployments(object.GetNamespace()).Get(ownerRef.Name)
   395:                 if err != nil {
   396:                         klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
   397:                         return
   398:                 }
=> 399:                 c.enqueueWaitdeployment(waitDeployment)
   400:                 return
   401:         }
   402: }
   403:
   404: // enqueueWaitdeployment takes a enqueueWaitdeployment resource and converts
(dlv) b syncHandler

handleObject syncHandler

(dlv) c
> github.com/xwen-winnie/crd_demo/kube.(*Client).handleObject() ./kube/controller.go:372 (hits goroutine(45):5 total:5) (PC: 0xce9270)
Warning: debugging optimized function
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:269 (hits goroutine(146):1 total:1) (PC: 0xce8390)
Warning: debugging optimized function
   264: }
   265:
   266: // syncHandler compares the actual state with the desired, and attempts to
   267: // converge the two. It then updates the Status block of the Waitdeployment
   268: // resource with the current status of the resource.
=> 269: func (c *Client) syncHandler(key string) error {
   270:         // Convert the namespace/name string into a distinct namespace and name
   271:         ns, n, err := cache.SplitMetaNamespaceKey(key)
   272:         if err != nil {
   273:                 utilRuntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
   274:                 return nil
(dlv) 

 Create  Deployment

func (c *Client) checkAndStartDeployment(waitDeployment *v1alpha1.Waitdeployment) (*appsV1.Deployment, error) {
        err := doTCPProbe(waitDeployment.WaitProbe.Address, waitDeployment.WaitProbe.Timeout)
        if err != nil {
                return nil, err
        }
        deployment, err := c.deploymentsLister.Deployments(waitDeployment.Namespace).Get(waitDeployment.Name)
        if errors.IsNotFound(err) {
                klog.Infof("Waitdeployment not exist, create a new deployment %s in namespace %s", waitDeployment.Name, waitDeployment.Namespace)
                deployment, err = c.kubeClient.AppsV1().Deployments(waitDeployment.Namespace).Create(c.ctx, newDeployment(waitDeployment), metav1.CreateOptions{})
        }
        if err != nil {
                return nil, err
        }
        return deployment, nil
}
(dlv) n
> github.com/xwen-winnie/crd_demo/kube.(*Client).syncHandler() ./kube/controller.go:288 (PC: 0xce852c)
Warning: debugging optimized function
   283:                 }
   284:                 return err
   285:         }
   286:
   287:         deployment, err := c.checkAndStartDeployment(waitDeployment)
=> 288:         if err != nil {
   289:                 return err
   290:         }
   291:
   292:         // If the Deployment is not controlled by this waitDeployment resource,
   293:         // we should log a warning to the event recorder and return error msg.
(dlv) p deployment
*k8s.io/api/apps/v1.Deployment {
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""},
        ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
                Name: "wdtest",
                GenerateName: "",
                Namespace: "default",
                SelfLink: "/apis/apps/v1/namespaces/default/deployments/wdtest",
                UID: "621ef39c-3863-4f39-9e6e-2a3370d096e8",
                ResourceVersion: "16739532",
                Generation: 1,
                CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x400071a088),
                DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
                DeletionGracePeriodSeconds: *int64 nil,
                Labels: map[string]string [...],
                Annotations: map[string]string nil,
                OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference")(0x40009f4000),
                ],
                Finalizers: []string len: 0, cap: 0, nil,
                ClusterName: "",
                ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40009f4050),
                ],},
        Spec: k8s.io/api/apps/v1.DeploymentSpec {
                Replicas: *1,
                Selector: *(*"k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector")(0x400058e3a0),
                Template: (*"k8s.io/api/core/v1.PodTemplateSpec")(0x400071a128),
                Strategy: (*"k8s.io/api/apps/v1.DeploymentStrategy")(0x400071a400),
                MinReadySeconds: 0,
                RevisionHistoryLimit: *10,
                Paused: false,
                ProgressDeadlineSeconds: *600,},
        Status: k8s.io/api/apps/v1.DeploymentStatus {
                ObservedGeneration: 0,
                Replicas: 0,
                UpdatedReplicas: 0,
                ReadyReplicas: 0,
                AvailableReplicas: 0,
                UnavailableReplicas: 0,
                Conditions: []k8s.io/api/apps/v1.DeploymentCondition len: 0, cap: 0, nil,
                CollisionCount: *int32 nil,},}
(dlv) 

获取 k8s node 节点信息

//获取NODE
fmt.Println("####### 获取node ######")
nodes, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
    panic(err)
}
for _,nds := range nodes.Items {
    fmt.Printf("NodeName: %s
", nds.Name)
}

//获取 指定NODE 的详细信息
fmt.Println("
 ####### node详细信息 ######")
nodeName := "k8s-master2"
nodeRel, err := clientset.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
    panic(err)
}
fmt.Printf("Name: %s 
", nodeRel.Name)
fmt.Printf("CreateTime: %s 
", nodeRel.CreationTimestamp)
fmt.Printf("NowTime: %s 
", nodeRel.Status.Conditions[0].LastHeartbeatTime)
fmt.Printf("kernelVersion: %s 
", nodeRel.Status.NodeInfo.KernelVersion)
fmt.Printf("SystemOs: %s 
", nodeRel.Status.NodeInfo.OSImage)
fmt.Printf("Cpu: %s 
", nodeRel.Status.Capacity.Cpu())
fmt.Printf("docker: %s 
", nodeRel.Status.NodeInfo.ContainerRuntimeVersion)
// fmt.Printf("Status: %s 
", nodeRel.Status.Conditions[len(nodes.Items[0].Status.Conditions)-1].Type)
fmt.Printf("Status: %s 
", nodeRel.Status.Conditions[len(nodeRel.Status.Conditions)-1].Type)
fmt.Printf("Mem: %s 
", nodeRel.Status.Allocatable.Memory().String())
 

Creating your own admission controller

原文地址:https://www.cnblogs.com/dream397/p/15177061.html