kubeproxy

cmd\kube-proxy\proxy.go
func main() {
	// 新构建一个proxy运行命令,结合之前scheduler,我们可以看出k8s跑各组件的套路是一样的
	command := app.NewProxyCommand() --->cmd\kube-proxy\app\server.go
	code := cli.Run(command)
	os.Exit(code)
}
cmd\kube-proxy\app\server.go
NewProxyCommand
	cmd := &cobra.Command{
		Run: func(cmd *cobra.Command, args []string) {
		...
			if err := opts.Run(); err != nil {
				klog.ErrorS(err, "Error running ProxyServer")
				os.Exit(1)
			}
	}


Run
	...
	proxyServer, err := NewProxyServer(o) --->cmd\kube-proxy\app\server_others.go
	...
	return o.runLoop()


runLoop
	go func() {
		err := o.proxyServer.Run()
		o.errCh <- err
	}()
cmd\kube-proxy\app\server_others.go
// 由于我们一般都是在linux系统上跑k8s,所以代理开启的模式一般为ipvs或者iptables
func NewProxyServer(o *Options) (*ProxyServer, error) {
	return newProxyServer(o.config, o.CleanupAndExit, o.master)
}


**newProxyServer**
	...
	// 即便指定使用ipvs模式,仍然需要经过验证
	canUseIPVS, err := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface, config.IPVS.Scheduler) --->pkg\proxy\ipvs\proxier.go
	...
	if proxyMode == proxyModeIPTables {
		...
		proxier, err = iptables.NewDualStackProxier() --->pkg\proxy\iptables\proxier.go
	} else {
		proxier, err = ipvs.NewDualStackProxier() --->pkg\proxy\ipvs\proxier.go
	}
pkg\proxy\ipvs\proxier.go
// 该函数就是ipvs模式必须的一些模块的验证比对,如果失败则会降级成iptables模式
func CanUseIPVSProxier(handle KernelHandler, ipsetver IPSetVersioner, scheduler string) (bool, error)
pkg\proxy\iptables\proxier.go
// kube-proxy已经支持并开启了ipv6
NewDualStackProxier
	ipv4Proxier, err := NewProxier(ipt[0], sysctl,
		exec, syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname,
		nodeIP[0], recorder, healthzServer, ipFamilyMap[v1.IPv4Protocol])
	ipv6Proxier, err := NewProxier(ipt[1], sysctl,
		exec, syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname,
		nodeIP[1], recorder, healthzServer, ipFamilyMap[v1.IPv6Protocol])
	return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil --->pkg\proxy\metaproxier\meta_proxier.go


var _ proxy.Provider = &Proxier{}


NewProxier
	go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
		proxier.syncProxyRules, syncPeriod, wait.NeverStop)


// 对比几个pkg\proxy\metaproxier\meta_proxier.go里的接口方法可以发现,具体实现是基于syncProxyRules设置具体实现的,这个方法居然有800多行...
// 这是iptables触发的所有代码了,其他配置iptables规则的地方只有在iptablesInit处了
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
// 这个方法体现出的问题有很多
// 1.代码量稍多,调用频繁效率低
// 2.有新变更时就要把以前的全扫一遍再跟新的合并然后落盘
// 3.iptables是规则链表,创建一条逻辑加了N条规则。随集群规模增加iptables规则链表会很多,查询效率低
// 4.逻辑上感觉处理的不太好,规则先全都加完再删
**syncProxyRules**
	// 开头直接上锁
	proxier.mu.Lock()
	defer proxier.mu.Unlock()
	...
	// 获取svc跟endpoint的变更信息
	serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) --->pkg\proxy\service.go
	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) --->pkg\proxy\endpoints.go
	...
	// 对svc所用到ClusterIP/ExternalIP/LoadBalancerIP及NodePort都会有追踪清理工作
	for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
		if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
			conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
			for _, extIP := range svcInfo.ExternalIPStrings() {
				conntrackCleanupServiceIPs.Insert(extIP)
			}
			for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
				conntrackCleanupServiceIPs.Insert(lbIP)
			}
			nodePort := svcInfo.NodePort()
			if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
				conntrackCleanupServiceNodePorts.Insert(nodePort)
			}
		}
	}
	...
	// 如果失败了,则会按幂增长时间重试
	success := false
	defer func() {
		if !success {
			klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
			proxier.syncRunner.RetryAfter(proxier.syncPeriod)
		}
	}()
	...
	// 创建并链接kube链,没懂他每次还要这里双验证然后拼固定参数的含义,感觉完全可以优化掉成常量
	for _, jump := range iptablesJumpChains {
		...
		args := append(jump.extraArgs,
			"-m", "comment", "--comment", jump.comment,
			"-j", string(jump.dstChain),
		)
		...
	}
	...
	// 获取原本的的Filter、NAT规则并构造成字典
	existingFilterChains := make(map[utiliptables.Chain][]byte)
	proxier.existingFilterChainsData.Reset()
	err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)
		existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes())
	existingNATChains := make(map[utiliptables.Chain][]byte)
	proxier.iptablesData.Reset()
	err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
		existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
	...
	// 将原本的的Filter、NAT规则回写到对应buffer里
	proxier.filterChains.Write("*filter")
	proxier.natChains.Write("*nat")
	for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
		if chain, ok := existingFilterChains[chainName]; ok {
			proxier.filterChains.WriteBytes(chain)
		} else {
			proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
		}
	}
	for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
		if chain, ok := existingNATChains[chainName]; ok {
			proxier.natChains.WriteBytes(chain)
		} else {
			proxier.natChains.Write(utiliptables.MakeChainLine(chainName))
		}
	}
	...
	// 构造postrouting规则写到buffer里
	proxier.natRules.Write(
		"-A", string(kubePostroutingChain),
		"-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
		"-j", "RETURN",
	)
	proxier.natRules.Write(
		"-A", string(kubePostroutingChain),
		"-j", "MARK", "--xor-mark", proxier.masqueradeMark,
	)
	masqRule := []string{
		"-A", string(kubePostroutingChain),
		"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
		"-j", "MASQUERADE",
	}
	if proxier.iptables.HasRandomFully() {
		masqRule = append(masqRule, "--random-fully")
	}
	proxier.natRules.Write(masqRule)
	...
	// 构造伪装规则写到buffer里
	proxier.natRules.Write(
		"-A", string(KubeMarkMasqChain),
		"-j", "MARK", "--or-mark", proxier.masqueradeMark,
	)
	...
	...
	**// 该方法最核心逻辑,为每个svc构建对应规则,共计550行**
	for svcName, svc := range proxier.serviceMap {
		// 获取一些关于svc及其endpoints的信息
		svcInfo, ok := svc.(*serviceInfo)
		allEndpoints := proxier.endpointsMap[svcName]
		...
		var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
		for _, ep := range allEndpoints {
			if ep.IsReady() {
				hasEndpoints = true
				if ep.GetIsLocal() {
					hasLocalReadyEndpoints = true
				}
			} else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
				// 需要svc配置允许使用不可用后端才会去验证
				if ep.IsServing() && ep.IsTerminating() {
					hasEndpoints = true
					if ep.GetIsLocal() {
						hasLocalServingTerminatingEndpoints = true
					}
				}
			}
		}
		useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints
		for _, ep := range allEndpoints {
			...
			// 添加到转发到endpoints的规则
			if svc.NodeLocalExternal() && epInfo.IsLocal {
				if useTerminatingEndpoints {
					if epInfo.Serving && epInfo.Terminating {
						localEndpointChains = append(localEndpointChains, endpointChain)
						endpointInUse = true
					}
				} else if epInfo.Ready {
					localEndpointChains = append(localEndpointChains, endpointChain)
					endpointInUse = true
				}
			}
		}
		...
		// 记录之前的svc链
		// 记录之前的lb类型链
		// 创建clusterIP类型规则
		args = append(args[:0],
				"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
				"-m", protocol, "-p", protocol,
				"-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
				"--dport", strconv.Itoa(svcInfo.Port()),
			)
		// 创建externalIPs类型规则
		args = append(args[:0],
					"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
					"-m", protocol, "-p", protocol,
					"-d", utilproxy.ToCIDR(netutils.ParseIPSloppy(externalIP)),
					"--dport", strconv.Itoa(svcInfo.Port()),
				)
		// 创建load-balancer类型规则
		args = append(args[:0],
						"-A", string(kubeServicesChain),
						"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
						"-m", protocol, "-p", protocol,
						"-d", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)),
						"--dport", strconv.Itoa(svcInfo.Port()),
					)
		// 创建节点端口转发类型规则
		args = append(args[:0],
					"-m", "comment", "--comment", svcNameString,
					"-m", protocol, "-p", protocol,
					"--dport", strconv.Itoa(svcInfo.NodePort()),
				)
	}
	...
	...
	// 清理不用的规则
	for chain := range existingNATChains {
		if !activeNATChains[chain] {
			chainString := string(chain)
			if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
				// 不是咱自己的不能乱操作
				continue
			}
			// 必须按iptables的规则多谢一条用来删除它的链规则
			proxier.natChains.WriteBytes(existingNATChains[chain])
			proxier.natRules.Write("-X", chainString)
		}
	}
	...
	// 将所有表的新规则落盘
	err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
	// 此次修改成功
	success = true


type iptablesJumpChain struct {
	table     utiliptables.Table
	dstChain  utiliptables.Chain
	srcChain  utiliptables.Chain
	comment   string
	extraArgs []string
}

var iptablesJumpChains = []iptablesJumpChain{
	{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
	{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainForward, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
	{utiliptables.TableFilter, kubeNodePortsChain, utiliptables.ChainInput, "kubernetes health check service ports", nil},
	{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
	{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
	{utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
	{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil},
	{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil},
	{utiliptables.TableNAT, kubePostroutingChain, utiliptables.ChainPostrouting, "kubernetes postrouting rules", nil},
}
pkg\proxy\ipvs\proxier.go
**syncProxyRules**
	// 跟iptables的前置流程一样
	proxier.mu.Lock()
	defer proxier.mu.Unlock()
	...
	// 获取svc跟endpoint的变更信息
	serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) --->pkg\proxy\service.go
	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) --->pkg\proxy\endpoints.go
	...
	// 对svc所用到ClusterIP/ExternalIP/LoadBalancerIP及NodePort都会有追踪清理工作
	for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
		if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
			conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
			for _, extIP := range svcInfo.ExternalIPStrings() {
				conntrackCleanupServiceIPs.Insert(extIP)
			}
			for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
				conntrackCleanupServiceIPs.Insert(lbIP)
			}
			nodePort := svcInfo.NodePort()
			if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
				conntrackCleanupServiceNodePorts.Insert(nodePort)
			}
		}
	}
	// ipvs模式依然会使用iptables的一些规则,这是因为ipvs的设计是做负载均衡的,不支持过滤钩子这种功能,所以依然由iptables实现
	proxier.filterChains.Write("*filter")
	proxier.natChains.Write("*nat")
	proxier.createAndLinkKubeChain()
	
	**// 为每个svc添加对应ipvs设置,该段代码共计457行**
	for svcName, svc := range proxier.serviceMap {
		// 记录原本的规则
		for _, e := range proxier.endpointsMap[svcName] {
			...
			proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
		}
		// clusterIP类型
		proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
		// 这里会有SessionAffinity对比,可以将同一请求转发给相同后端,SessionAffinity默认10800秒
		if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
			serv.Flags |= utilipvs.FlagPersistent
			serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
		}
		// externalIPs类型
		proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
		// load-balancer类型
		proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
		// Nodeports类型
		nodePortSet.activeEntries.Insert(entry.String())
	}
	
	// 设置ipvs规则
	for _, set := range proxier.ipsetList {
		set.syncIPSetEntries()
	}
	
	// 后面继续是iptables的一些补充工作

// 这个文件里可以看到所有的操作都是ipv4、ipv6顺序执行的,即kube-proxy已经具备了ipv6生产能力

pkg\proxy\metaproxier\meta_proxier.go
func NewMetaProxier(ipv4Proxier, ipv6Proxier proxy.Provider) proxy.Provider {
	return proxy.Provider(&metaProxier{
		ipv4Proxier: ipv4Proxier,
		ipv6Proxier: ipv6Proxier,
	}) --->pkg\proxy\xxx\proxier.go
}
原文地址:https://www.cnblogs.com/bfmq/p/15593952.html