关于Pod Condition的一些思考

使用k8s的扩展调度器机制来实现当某个基础监控的服务Pod不Ready时,这个Pod所在的节点就不允许调度,例如Pod(daemonset形式部署)中的服务会检测节点的CNI网络插件如果没有正常工作,这个Pod的由于探针作用就会变成不Ready的,那么扩展调度器就会避免调度业务Pod到该节点。在实践过程中,发现Pod的Status字段中的Condition Type有Ready和ContainerReady,以下通过源码来简单看一下这两种的状态关系是怎么样的。

一个正常的Pod的Status字段如下:

status:
  conditions:
  - lastProbeTime: null
    lastTransitionTime: 2020-08-28T02:58:50Z
    status: "True"
    type: Ready
  - lastProbeTime: null
    lastTransitionTime: null
    status: "True"
    type: ContainersReady
  containerStatuses:
  - containerID: docker://e9875eb8bfae241f61a3139b8f70fd5a65f23687cbc3267bf2a364126ac1a20a
    image: docker.io/grafana/grafana:6.4.3
    imageID: docker-pullable://docker.io/grafana/grafana@sha256:bd55ea2bad17f5016431734b42fdfc202ebdc7d08b6c4ad35ebb03d06efdff69
    lastState: {}
    name: grafana
    ready: true
    restartCount: 0
    state:
      running:
        startedAt: 2020-08-28T08:38:00Z
  hostIP: 172.16.0.2
  phase: Running
  podIP: 10.244.0.84
  qosClass: Burstable
  startTime: 2020-08-28T08:37:57Z

Pod status condition中关于两种PodReady和ContainerReady类型的描述如下:

即PodReady表示这个Pod是否可以接收处理通过svc发过来的请求,当值为True时,controller-manager中的svc controller和ep controller就会把这个pod加入到对应的ep列表,节点的kube-proxy(openshift上是sdn pod)就会watch到这个变化,在节点上为svc增加对应的iptables nat转发规则。

ContainerReady表示Pod中的所有容器是否都是Ready状态了(即kc get pod的中n/m,n<=m 字段),是否Ready由用户为这个Pod所配置的Readiness探针的探测结果为准。

// These are valid conditions of pod.
const (
    // PodReady means the pod is able to service requests and should be added to the
    // load balancing pools of all matching services.
    PodReady PodConditionType = "Ready"
    // ContainersReady indicates whether all containers in the pod are ready.
    ContainersReady PodConditionType = "ContainersReady"
)

我们知道ContainerReady属性值由Readiness探针决定,那什么情况下会影响PodReady的值?

源码基于k8s1.11: https://github.com/kubernetes/kubernetes/tree/release-1.11

1、如果节点的状态为NotReady,那么NodeController就会通过调用MarkAllPodsNotReady方法把这个节点上的所有Pod 的PodReady Condition设置为False,如下

  #pkg/controller/nodelifecycle/node_lifecycle_controller.go
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
    nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
    if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
        utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
     }
 }


#pkg/controller/util/node/controller_utils.go,这个方法直接调clientset更新Pod Status
func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error

2、Kubelet中的StatusManager会根据容器的状态更新Etcd中Pod的status,如下,status_manager.go中的Start方法从podStatusChannel中获取status变化信息,并通过SyncPod方法将变化信息merger到Etcd中

  #pkg/kubelet/status/status_manager.go Start()
   
// 从channel中后去Pod Status变化
go wait.Forever(func() { 
    for { 
        select {
            case syncRequest := <-m.podStatusChannel:
                klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
                    syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
                m.syncPod(syncRequest.podUID, syncRequest.status)
            case <-syncTicker:
                                ......
            }
        }
    }, 0)



// syncPod方法调用mergePodStatus方法更新到etcd中 
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
    ...
    pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
    oldStatus := pod.Status.DeepCopy()
    newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
    ...
}

上面podStatusChannel中的值是由updateStatusInternal()方法生成,而updateStatusInternal中放入channel的值是SetContainerReadiness()方法中构造的status,如下

// updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary. Returns whether an update was triggered.
// This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {

    normalizeStatus(pod, &status)

    newStatus := versionedPodStatus{
        status:       status,
        version:      cachedStatus.version + 1,
        podName:      pod.Name,
        podNamespace: pod.Namespace,
    }
    m.podStatuses[pod.UID] = newStatus

    select {
    case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
        glog.V(5).Infof("Status Manager: adding pod: %q, with status: (%q, %v) to podStatusChannel",
            pod.UID, newStatus.version, newStatus.status)
        return true
    default:
                ......
    }
}

func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {

    pod, ok := m.podManager.GetPodByUID(podUID)
    oldStatus, found := m.podStatuses[pod.UID]

    // Find the container to update.
    containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
        // 判断cache中的Pod Status的container ready状态是不是和方法参数ready一致的
    if containerStatus.Ready == ready {
        glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
            format.Pod(pod), containerID.String())
        return
    }

    // Make sure we're not updating the cached version.
        // 不要直接更新缓存中的ContainerStatus,因为后续这个更新不一定会提交到apiserver
    status := *oldStatus.status.DeepCopy()
    containerStatus, _, _ = findContainerStatus(&status, containerID.String())
    containerStatus.Ready = ready

    // updateConditionFunc updates the corresponding type of condition
    updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
                ......         
            status.Conditions[conditionIndex] = condition
            ......
    }
    // 这里通过GeneratePodReadyCondition()方法构造Pod Status,GeneratePodReadyCondition方法判断当status.ContainerStatus都是Ready时,就返回status.Condition的PodReady为True
updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase)) updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)) m.updateStatusInternal(pod, status, false) }

而status_manager结构体中的SetContainerReadiness()方法只在kubelet的prober_manager即探针模块调用了,如下

// prober_manager模块会根据ReadinessProber检测结果调用status_manager模块的SetContainerReadiness更新容器的ready属性值
// pkg/kubelet/prober/prober_manager.go
func (m *manager) updateReadiness() {
    update := <-m.readinessManager.Updates()

    ready := update.Result == results.Success
    m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

虽然kc get pod中的Ready子段值为1/1(即ContainersReady=True),但是并不代表这个Pod会接收Service过来的请求。这个情况时比较好重现出来的,先把节点的kubelet服务停了,node在大约40s(controller-manager参数指定)之后会被NodeController标记为NotReady,并Update节点上面的Pod的PodReady Condition为False,EndpointController Watch到Pod status变化之后就会把Pod从svc对应的ep列表中移除。

[root@k8s-master kubelet]# kc get pod -o wide
NAME                         READY   STATUS    RESTARTS   AGE   IP            NODE
grafana-b5c674bc4-8xmzb      1/1     Running   0          4d    10.244.0.84   k8s-master.com
prometheus-9d44889cc-6jm2h   1/1     Running   0          4d    10.244.0.91   k8s-master.com

###上面的Pod是1/1的,但是svc中没有这个endpoint###
[root@k8s
-master kubelet]# kc describe svc grafana Name: grafana Namespace: istio-system Labels: app=grafana release=istio Annotations: kubectl.kubernetes.io/last-applied-configuration: {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"grafana","release":"istio"},"name":"grafana","namespace"... Selector: app=grafana Type: NodePort IP: 10.96.188.25 Port: http 3000/TCP TargetPort: 3000/TCP NodePort: http 31652/TCP Endpoints: Session Affinity: None External Traffic Policy: Cluster Events: <none> [root@k8s-master kubelet]#
原文地址:https://www.cnblogs.com/orchidzjl/p/13588148.html