client-go workqueue

k8s.io/api/core/v1/

grep Pod -rn * | grep type | grep struct

root@ubuntu:/opt/gopath# ls ./src/k8s.io/kubernetes/staging/src/k8s.io/api/core/v1/
annotation_key_constants.go  lifecycle.go        taint_test.go                   well_known_labels.go
BUILD                        objectreference.go  toleration.go                   well_known_taints.go
doc.go                       register.go         toleration_test.go              zz_generated.deepcopy.go
generated.pb.go              resource.go         types.go
generated.proto              taint.go            types_swagger_doc_generated.go
root@ubuntu:/opt/gopath# 

 

root@ubuntu:/opt/gopath/src/k8s.io/kubernetes/staging/src/k8s.io/api/core/v1# ls types.go 
types.go
root@ubuntu:/opt/gopath/src/k8s.io/kubernetes/staging/src/k8s.io/api/core/v1# ls types.go 
types.go
        "k8s.io/apimachinery/pkg/api/resource"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/apimachinery/pkg/util/intstr"

apimachinery/pkg/apis/meta/v1/types.go

apimachinery/pkg/apis/meta/v1/

// Object lets you work with object metadata from any of the versioned or
// internal API objects. Attempting to set or retrieve a field on an object that does
// not support that field (Name, UID, Namespace on lists) will be a no-op and return
// a default value.
type Object interface {
    GetNamespace() string
    SetNamespace(namespace string)
    GetName() string
    SetName(name string)
    GetGenerateName() string
    SetGenerateName(name string)
    GetUID() types.UID
    SetUID(uid types.UID)
    GetResourceVersion() string
    SetResourceVersion(version string)
    GetGeneration() int64
    SetGeneration(generation int64)
    GetSelfLink() string
    SetSelfLink(selfLink string)
    GetCreationTimestamp() Time
    SetCreationTimestamp(timestamp Time)
    GetDeletionTimestamp() *Time
    SetDeletionTimestamp(timestamp *Time)
    GetDeletionGracePeriodSeconds() *int64
    SetDeletionGracePeriodSeconds(*int64)
    GetLabels() map[string]string
    SetLabels(labels map[string]string)
    GetAnnotations() map[string]string
    SetAnnotations(annotations map[string]string)
    GetFinalizers() []string
    SetFinalizers(finalizers []string)
    GetOwnerReferences() []OwnerReference
    SetOwnerReferences([]OwnerReference)
    GetClusterName() string
    SetClusterName(clusterName string)
    GetManagedFields() []ManagedFieldsEntry
    SetManagedFields(managedFields []ManagedFieldsEntry)
}

// ListMetaAccessor retrieves the list interface from an object
type ListMetaAccessor interface {
    GetListMeta() ListInterface
}

// Common lets you work with core metadata from any of the versioned or
// internal API objects. Attempting to set or retrieve a field on an object that does
// not support that field will be a no-op and return a default value.
// TODO: move this, and TypeMeta and ListMeta, to a different package
type Common interface {
    GetResourceVersion() string
    SetResourceVersion(version string)
    GetSelfLink() string
    SetSelfLink(selfLink string)
}

// ListInterface lets you work with list metadata from any of the versioned or
// internal API objects. Attempting to set or retrieve a field on an object that does
// not support that field will be a no-op and return a default value.
// TODO: move this, and TypeMeta and ListMeta, to a different package
type ListInterface interface {
    GetResourceVersion() string
    SetResourceVersion(version string)
    GetSelfLink() string
    SetSelfLink(selfLink string)
    GetContinue() string
    SetContinue(c string)
    GetRemainingItemCount() *int64
    SetRemainingItemCount(c *int64)
}

// Type exposes the type and APIVersion of versioned or internal API objects.
// TODO: move this, and TypeMeta and ListMeta, to a different package
type Type interface {
    GetAPIVersion() string
    SetAPIVersion(version string)
    GetKind() string
    SetKind(kind string)
}

var _ ListInterface = &ListMeta{}

func (meta *ListMeta) GetResourceVersion() string        { return meta.ResourceVersion }
func (meta *ListMeta) SetResourceVersion(version string) { meta.ResourceVersion = version }
func (meta *ListMeta) GetSelfLink() string               { return meta.SelfLink }
func (meta *ListMeta) SetSelfLink(selfLink string)       { meta.SelfLink = selfLink }
func (meta *ListMeta) GetContinue() string               { return meta.Continue }
func (meta *ListMeta) SetContinue(c string)              { meta.Continue = c }
func (meta *ListMeta) GetRemainingItemCount() *int64     { return meta.RemainingItemCount }
func (meta *ListMeta) SetRemainingItemCount(c *int64)    { meta.RemainingItemCount = c }

func (obj *TypeMeta) GetObjectKind() schema.ObjectKind { return obj }

// SetGroupVersionKind satisfies the ObjectKind interface for all objects that embed TypeMeta
func (obj *TypeMeta) SetGroupVersionKind(gvk schema.GroupVersionKind) {
    obj.APIVersion, obj.Kind = gvk.ToAPIVersionAndKind()
}

// GroupVersionKind satisfies the ObjectKind interface for all objects that embed TypeMeta
func (obj *TypeMeta) GroupVersionKind() schema.GroupVersionKind {
    return schema.FromAPIVersionAndKind(obj.APIVersion, obj.Kind)
}

func (obj *ListMeta) GetListMeta() ListInterface { return obj }

func (obj *ObjectMeta) GetObjectMeta() Object { return obj }

// Namespace implements metav1.Object for any object with an ObjectMeta typed field. Allows
// fast, direct access to metadata fields for API objects.
func (meta *ObjectMeta) GetNamespace() string                { return meta.Namespace }
func (meta *ObjectMeta) SetNamespace(namespace string)       { meta.Namespace = namespace }
func (meta *ObjectMeta) GetName() string                     { return meta.Name }
func (meta *ObjectMeta) SetName(name string)                 { meta.Name = name }
func (meta *ObjectMeta) GetGenerateName() string             { return meta.GenerateName }
func (meta *ObjectMeta) SetGenerateName(generateName string) { meta.GenerateName = generateName }
func (meta *ObjectMeta) GetUID() types.UID                   { return meta.UID }
func (meta *ObjectMeta) SetUID(uid types.UID)                { meta.UID = uid }
func (meta *ObjectMeta) GetResourceVersion() string          { return meta.ResourceVersion }
func (meta *ObjectMeta) SetResourceVersion(version string)   { meta.ResourceVersion = version }
func (meta *ObjectMeta) GetGeneration() int64                { return meta.Generation }
func (meta *ObjectMeta) SetGeneration(generation int64)      { meta.Generation = generation }
func (meta *ObjectMeta) GetSelfLink() string                 { return meta.SelfLink }
func (meta *ObjectMeta) SetSelfLink(selfLink string)         { meta.SelfLink = selfLink }
func (meta *ObjectMeta) GetCreationTimestamp() Time          { return meta.CreationTimestamp }
func (meta *ObjectMeta) SetCreationTimestamp(creationTimestamp Time) {
    meta.CreationTimestamp = creationTimestamp
}
func (meta *ObjectMeta) GetDeletionTimestamp() *Time { return meta.DeletionTimestamp }
func (meta *ObjectMeta) SetDeletionTimestamp(deletionTimestamp *Time) {
    meta.DeletionTimestamp = deletionTimestamp
}
func (meta *ObjectMeta) GetDeletionGracePeriodSeconds() *int64 {
    return meta.DeletionGracePeriodSeconds
}
func (meta *ObjectMeta) SetDeletionGracePeriodSeconds(deletionGracePeriodSeconds *int64) {
    meta.DeletionGracePeriodSeconds = deletionGracePeriodSeconds
}
func (meta *ObjectMeta) GetLabels() map[string]string                 { return meta.Labels }
func (meta *ObjectMeta) SetLabels(labels map[string]string)           { meta.Labels = labels }
func (meta *ObjectMeta) GetAnnotations() map[string]string            { return meta.Annotations }
func (meta *ObjectMeta) SetAnnotations(annotations map[string]string) { meta.Annotations = annotations }
func (meta *ObjectMeta) GetFinalizers() []string                      { return meta.Finalizers }
func (meta *ObjectMeta) SetFinalizers(finalizers []string)            { meta.Finalizers = finalizers }
func (meta *ObjectMeta) GetOwnerReferences() []OwnerReference         { return meta.OwnerReferences }
func (meta *ObjectMeta) SetOwnerReferences(references []OwnerReference) {
    meta.OwnerReferences = references
}
func (meta *ObjectMeta) GetClusterName() string                 { return meta.ClusterName }
func (meta *ObjectMeta) SetClusterName(clusterName string)      { meta.ClusterName = clusterName }
func (meta *ObjectMeta) GetManagedFields() []ManagedFieldsEntry { return meta.ManagedFields }
func (meta *ObjectMeta) SetManagedFields(managedFields []ManagedFieldsEntry) {
    meta.ManagedFields = managedFields
}

NewListWatchFromClient

// NewSourceAPIserver creates config source that watches for changes to the services and endpoints.
func NewSourceAPI(c *client.Client, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
    servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())
    endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())

    newServicesSourceApiFromLW(servicesLW, period, servicesChan)
    newEndpointsSourceApiFromLW(endpointsLW, period, endpointsChan)
}

v1.NamespaceDefault, fields.Everything

 // create the pod watcher
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

root@ubuntu:~/client-go/examples/workqueue# ./wq  -kubeconfig=$HOME/.kube/config
I0918 16:02:02.522945   27907 main.go:124] Starting Pod controller
Sync/Add/Update for Pod apache-app-84f76964b5-kt5cx
Sync/Add/Update for Pod nginx-app-56b5bb67cc-mkfct
Sync/Add/Update for Pod nginx-app-56b5bb67cc-s9jtk
Sync/Add/Update for Pod tea-69c99ff568-hdcbl
Sync/Add/Update for Pod nginx-karmada-f89759699-qcztn
Sync/Add/Update for Pod nginx-karmada-f89759699-vn47h
Sync/Add/Update for Pod tea-69c99ff568-p59d6
Sync/Add/Update for Pod tea-69c99ff568-tm9q6
Sync/Add/Update for Pod coffee-5f56ff9788-zs2f7
Sync/Add/Update for Pod example-foo-54dc4db9fc-fmsqn
Sync/Add/Update for Pod web2-7cdf5dffb-26xrn
Sync/Add/Update for Pod igh-agent-67d94498c6-dwtsg
Sync/Add/Update for Pod apache-app-84f76964b5-fgsc7
Sync/Add/Update for Pod coffee-5f56ff9788-plfcq
Sync/Add/Update for Pod web3-c9654466d-xwb5j
Sync/Add/Update for Pod nginx-config-7775cff659-8pf2v
Pod default/mypod does not exist anymore

k8s.io/apimachinery/pkg/fields 

/ logNodeEvents logs kubelet events from the given node. This includes kubelet
// restart and node unhealthy events. Note that listing events like this will mess
// with latency metrics, beware of calling it during a test.
func getNodeEvents(c clientset.Interface, nodeName string) []v1.Event {
    selector := fields.Set{
        "involvedObject.kind":      "Node",
        "involvedObject.name":      nodeName,
        "involvedObject.namespace": metav1.NamespaceAll,
        "source":                   "kubelet",
    }.AsSelector().String()
    options := metav1.ListOptions{FieldSelector: selector}
    events, err := c.CoreV1().Events(metav1.NamespaceSystem).List(context.TODO(), options)
    if err != nil {
        Logf("Unexpected error retrieving node events %v", err)
        return []v1.Event{}
    }
    return events.Items
}
// GrabFromKubelet returns metrics from kubelet
func (g *Grabber) GrabFromKubelet(nodeName string) (KubeletMetrics, error) {
    nodes, err := g.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{FieldSelector: fields.Set{"metadata.name": nodeName}.AsSelector().String()})
    if err != nil {
        return KubeletMetrics{}, err
    }
    if len(nodes.Items) != 1 {
        return KubeletMetrics{}, fmt.Errorf("Error listing nodes with name %v, got %v", nodeName, nodes.Items)
    }
    kubeletPort := nodes.Items[0].Status.DaemonEndpoints.KubeletEndpoint.Port
    return g.grabFromKubeletInternal(nodeName, int(kubeletPort))
}
    if kubeDeps.KubeClient != nil {
        fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
        nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
        r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
        go r.Run(wait.NeverStop)
    }

AsSelector

root@ubuntu:~/client-go/examples/workqueue# kubectl get pods 
NAME                            READY   STATUS    RESTARTS   AGE
apache-app-84f76964b5-fgsc7     1/1     Running   3          39d
apache-app-84f76964b5-kt5cx     1/1     Running   1          43d
coffee-5f56ff9788-plfcq         1/1     Running   1          23d
coffee-5f56ff9788-zs2f7         1/1     Running   0          23d

fieldSelector := fields.Set{meta_v1.ObjectNameField: string("coffee-5f56ff9788-zs2f7")}.AsSelector()
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "default", fieldSelector)

root@ubuntu:~/client-go/examples/workqueue# ./wq  -kubeconfig=$HOME/.kube/config
I0918 18:01:12.998024   58790 main.go:124] Starting Pod controller
Sync/Add/Update for Pod coffee-5f56ff9788-zs2f7
Pod default/mypod does not exist anymore
^C

cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "karmada-system", fields.Everything())

 

root@ubuntu:~/client-go/examples/workqueue# ./wq  -kubeconfig=$HOME/.kube/config
I0918 17:24:19.264121   12049 main.go:124] Starting Pod controller
Sync/Add/Update for Pod karmada-agent-96fc58c4-xhb7c
Pod default/mypod does not exist anymore
^C
root@ubuntu:~/client-go/examples/workqueue# kubectl get pods -n karmada-system
NAME                           READY   STATUS    RESTARTS   AGE
karmada-agent-96fc58c4-xhb7c   1/1     Running   0          28d
root@ubuntu:~/client-go/examples/workqueue#

obj.(*v1.Pod)  Spec.NodeName  Annotations

func (c *Controller) syncToStdout(key string) error {
        obj, exists, err := c.indexer.GetByKey(key)
        if err != nil {
                klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
                return err
        }

        if !exists {
                // Below we will warm up our cache with a Pod, so that we will see a delete for one pod
                fmt.Printf("Pod %s does not exist anymore
", key)
        } else {
                // Note that you also have to check the uid if you have a local controlled resource, which
                // is dependent on the actual instance, to detect that a Pod was recreated with the same name
                fmt.Printf("Sync/Add/Update for Pod %s and in node %s 
", obj.(*v1.Pod).GetName(), obj.(*v1.Pod).Spec.NodeName)
        }
        return nil
}
root@ubuntu:~/client-go/examples/workqueue# go build -o wq .
root@ubuntu:~/client-go/examples/workqueue# ./wq  -kubeconfig=$HOME/.kube/config
I0918 18:55:20.821066   61107 main.go:124] Starting Pod controller
Sync/Add/Update for Pod coffee-5f56ff9788-zs2f7 and in node cloud 
Pod default/mypod does not exist anymore
原文地址:https://www.cnblogs.com/dream397/p/15309090.html