nginx动态化~1 nginx-ingress

从入口来看下

cmd/nginx/main.go 初始化各种逻辑,开启监控数据收集,初始化web服务,初始化Nginx等

func main() {
    ...
    kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
    ...
    reg := prometheus.NewRegistry()
    mc := metric.NewDummyCollector()
    ...
    ngx := controller.NewNGINXController(conf, mc)

    mux := http.NewServeMux()
    registerHealthz(nginx.HealthPath, ngx, mux)
    registerMetrics(reg, mux)

    go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
    go ngx.Start()
....

ingress/controller/nginx.go

// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
   //定义事件处理。初始化
   //webhoot处理
       if n.cfg.ValidationWebhook != "" {
        n.validationWebhookServer = &http.Server{
            Addr:      config.ValidationWebhook,
            Handler:   adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
            TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
            // disable http/2
            // https://github.com/kubernetes/kubernetes/issues/80313
            // https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159
            TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
        }
    }
   // 每个元素入队都会调n.syncIngress
   n.syncQueue = task.NewTaskQueue(n.syncIngress).
   // 文件变更回调注册
   onTemplateChange := func() {
        template, err := ngx_template.NewTemplate(nginx.TemplatePath)
        if err != nil {
            // this error is different from the rest because it must be clear why nginx is not working
            klog.ErrorS(err, "Error loading new template")
            return
        }

        n.t = template
        klog.InfoS("New NGINX configuration template loaded")
        n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
    }

    ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
    if err != nil {
        klog.Fatalf("Invalid NGINX configuration template: %v", err)
    }

    n.t = ngxTpl

    _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
...

start

func (n *NGINXController) Start() {
   n.store.Run(n.stopCh)
   ...
   go n.syncQueue.Run(time.Second, n.stopCh)
    // force initial sync
    n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
   for {
        select {
        case err := <-n.ngxErrCh:
            if n.isShuttingDown {
                return
            }

            // if the nginx master process dies, the workers continue to process requests
            // until the failure of the configured livenessProbe and restart of the pod.
            if process.IsRespawnIfRequired(err) {
                return
            }

        case event := <-n.updateCh.Out():
            if n.isShuttingDown {
                break
            }

            if evt, ok := event.(store.Event); ok {
                klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
                if evt.Type == store.ConfigurationEvent {
                    // TODO: is this necessary? Consider removing this special case
                    n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
                    continue
                }

                n.syncQueue.EnqueueSkippableTask(evt.Obj)
            } else {
                klog.Warningf("Unexpected event type received %T", event)
            }
        case <-n.stopCh:
            return
        }
    }
...

syncIngress

func (n *NGINXController) syncIngress(interface{}) error {
    n.syncRateLimiter.Accept()

    if n.syncQueue.IsShuttingDown() {
        return nil
    }

    ings := n.store.ListIngresses()
    hosts, servers, pcfg := n.getConfiguration(ings)

    n.metricCollector.SetSSLExpireTime(servers)

    if n.runningConfig.Equal(pcfg) {
        klog.V(3).Infof("No configuration change detected, skipping backend reload")
        return nil
    }

    n.metricCollector.SetHosts(hosts)

    if !n.IsDynamicConfigurationEnough(pcfg) {
        klog.InfoS("Configuration changes detected, backend reload required")

        hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
            TagName: "json",
        })

        pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)

        err := n.OnUpdate(*pcfg)
        if err != nil {
            n.metricCollector.IncReloadErrorCount()
            n.metricCollector.ConfigSuccess(hash, false)
            klog.Errorf("Unexpected failure reloading the backend:
%v", err)
            n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
            return err
        }

        klog.InfoS("Backend successfully reloaded")
        n.metricCollector.ConfigSuccess(hash, true)
        n.metricCollector.IncReloadCount()

        n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
    }

    isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
    if isFirstSync {
        // For the initial sync it always takes some time for NGINX to start listening
        // For large configurations it might take a while so we loop and back off
        klog.InfoS("Initial sync, sleeping for 1 second")
        time.Sleep(1 * time.Second)
    }

    retry := wait.Backoff{
        Steps:    15,
        Duration: 1 * time.Second,
        Factor:   0.8,
        Jitter:   0.1,
    }

    err := wait.ExponentialBackoff(retry, func() (bool, error) {
        err := n.configureDynamically(pcfg)
        if err == nil {
            klog.V(2).Infof("Dynamic reconfiguration succeeded.")
            return true, nil
        }

        klog.Warningf("Dynamic reconfiguration failed: %v", err)
        return false, err
    })
    if err != nil {
        klog.Errorf("Unexpected failure reconfiguring NGINX:
%v", err)
        return err
    }

    ri := getRemovedIngresses(n.runningConfig, pcfg)
    re := getRemovedHosts(n.runningConfig, pcfg)
    n.metricCollector.RemoveMetrics(ri, re)

    n.runningConfig = pcfg

    return nil
}

更新具体动作

func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
    cfg := n.store.GetBackendConfiguration()
    cfg.Resolver = n.resolver

    content, err := n.generateTemplate(cfg, ingressCfg)
    if err != nil {
        return err
    }

    err = createOpentracingCfg(cfg)
    if err != nil {
        return err
    }

    err = n.testTemplate(content)
    if err != nil {
        return err
    }

    if klog.V(2).Enabled() {
        src, _ := os.ReadFile(cfgPath)
        if !bytes.Equal(src, content) {
            tmpfile, err := os.CreateTemp("", "new-nginx-cfg")
            if err != nil {
                return err
            }
            defer tmpfile.Close()
            err = os.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
            if err != nil {
                return err
            }

            diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
            if err != nil {
                if exitError, ok := err.(*exec.ExitError); ok {
                    ws := exitError.Sys().(syscall.WaitStatus)
                    if ws.ExitStatus() == 2 {
                        klog.Warningf("Failed to executing diff command: %v", err)
                    }
                }
            }

            klog.InfoS("NGINX configuration change", "diff", string(diffOutput))

            // we do not defer the deletion of temp files in order
            // to keep them around for inspection in case of error
            os.Remove(tmpfile.Name())
        }
    }

    err = os.WriteFile(cfgPath, content, file.ReadWriteByUser)
    if err != nil {
        return err
    }

    o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
    if err != nil {
        return fmt.Errorf("%v
%v", err, string(o))
    }

    return nil
}
// generateTemplate returns the nginx configuration file content
func (n NGINXController) generateTemplate(cfg ngx_config.Configuration, ingressCfg ingress.Configuration) ([]byte, error) {

    if n.cfg.EnableSSLPassthrough {
        servers := []*TCPServer{}
        for _, pb := range ingressCfg.PassthroughBackends {
            svc := pb.Service
            if svc == nil {
                klog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
                continue
            }
            port, err := strconv.Atoi(pb.Port.String()) // #nosec
            if err != nil {
                for _, sp := range svc.Spec.Ports {
                    if sp.Name == pb.Port.String() {
                        port = int(sp.Port)
                        break
                    }
                }
            } else {
                for _, sp := range svc.Spec.Ports {
                    if sp.Port == int32(port) {
                        port = int(sp.Port)
                        break
                    }
                }
            }

            // TODO: Allow PassthroughBackends to specify they support proxy-protocol
            servers = append(servers, &TCPServer{
                Hostname:      pb.Hostname,
                IP:            svc.Spec.ClusterIP,
                Port:          port,
                ProxyProtocol: false,
            })
        }

        n.Proxy.ServerList = servers
    }

    // NGINX cannot resize the hash tables used to store server names. For
    // this reason we check if the current size is correct for the host
    // names defined in the Ingress rules and adjust the value if
    // necessary.
    // https://trac.nginx.org/nginx/ticket/352
    // https://trac.nginx.org/nginx/ticket/631
    var longestName int
    var serverNameBytes int

    for _, srv := range ingressCfg.Servers {
        hostnameLength := len(srv.Hostname)
        if srv.RedirectFromToWWW {
            hostnameLength += 4
        }
        if longestName < hostnameLength {
            longestName = hostnameLength
        }

        for _, alias := range srv.Aliases {
            if longestName < len(alias) {
                longestName = len(alias)
            }
        }

        serverNameBytes += hostnameLength
    }

    nameHashBucketSize := nginxHashBucketSize(longestName)
    if cfg.ServerNameHashBucketSize < nameHashBucketSize {
        klog.V(3).InfoS("Adjusting ServerNameHashBucketSize variable", "value", nameHashBucketSize)
        cfg.ServerNameHashBucketSize = nameHashBucketSize
    }

    serverNameHashMaxSize := nextPowerOf2(serverNameBytes)
    if cfg.ServerNameHashMaxSize < serverNameHashMaxSize {
        klog.V(3).InfoS("Adjusting ServerNameHashMaxSize variable", "value", serverNameHashMaxSize)
        cfg.ServerNameHashMaxSize = serverNameHashMaxSize
    }

    if cfg.MaxWorkerOpenFiles == 0 {
        // the limit of open files is per worker process
        // and we leave some room to avoid consuming all the FDs available
        maxOpenFiles := rlimitMaxNumFiles() - 1024
        klog.V(3).InfoS("Maximum number of open file descriptors", "value", maxOpenFiles)
        if maxOpenFiles < 1024 {
            // this means the value of RLIMIT_NOFILE is too low.
            maxOpenFiles = 1024
        }
        klog.V(3).InfoS("Adjusting MaxWorkerOpenFiles variable", "value", maxOpenFiles)
        cfg.MaxWorkerOpenFiles = maxOpenFiles
    }

    if cfg.MaxWorkerConnections == 0 {
        maxWorkerConnections := int(float64(cfg.MaxWorkerOpenFiles * 3.0 / 4))
        klog.V(3).InfoS("Adjusting MaxWorkerConnections variable", "value", maxWorkerConnections)
        cfg.MaxWorkerConnections = maxWorkerConnections
    }

    setHeaders := map[string]string{}
    if cfg.ProxySetHeaders != "" {
        cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
        if err != nil {
            klog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.ProxySetHeaders, err)
        } else {
            setHeaders = cmap.Data
        }
    }

    addHeaders := map[string]string{}
    if cfg.AddHeaders != "" {
        cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
        if err != nil {
            klog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.AddHeaders, err)
        } else {
            addHeaders = cmap.Data
        }
    }

    sslDHParam := ""
    if cfg.SSLDHParam != "" {
        secretName := cfg.SSLDHParam

        secret, err := n.store.GetSecret(secretName)
        if err != nil {
            klog.Warningf("Error reading Secret %q from local store: %v", secretName, err)
        } else {
            nsSecName := strings.Replace(secretName, "/", "-", -1)
            dh, ok := secret.Data["dhparam.pem"]
            if ok {
                pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh)
                if err != nil {
                    klog.Warningf("Error adding or updating dhparam file %v: %v", nsSecName, err)
                } else {
                    sslDHParam = pemFileName
                }
            }
        }
    }

    cfg.SSLDHParam = sslDHParam

    cfg.DefaultSSLCertificate = n.getDefaultSSLCertificate()

    tc := ngx_config.TemplateConfig{
        ProxySetHeaders:          setHeaders,
        AddHeaders:               addHeaders,
        BacklogSize:              sysctlSomaxconn(),
        Backends:                 ingressCfg.Backends,
        PassthroughBackends:      ingressCfg.PassthroughBackends,
        Servers:                  ingressCfg.Servers,
        TCPBackends:              ingressCfg.TCPEndpoints,
        UDPBackends:              ingressCfg.UDPEndpoints,
        Cfg:                      cfg,
        IsIPV6Enabled:            n.isIPV6Enabled && !cfg.DisableIpv6,
        NginxStatusIpv4Whitelist: cfg.NginxStatusIpv4Whitelist,
        NginxStatusIpv6Whitelist: cfg.NginxStatusIpv6Whitelist,
        RedirectServers:          buildRedirects(ingressCfg.Servers),
        IsSSLPassthroughEnabled:  n.cfg.EnableSSLPassthrough,
        ListenPorts:              n.cfg.ListenPorts,
        PublishService:           n.GetPublishService(),
        EnableMetrics:            n.cfg.EnableMetrics,
        MaxmindEditionFiles:      n.cfg.MaxmindEditionFiles,
        HealthzURI:               nginx.HealthPath,
        MonitorMaxBatchSize:      n.cfg.MonitorMaxBatchSize,
        PID:                      nginx.PID,
        StatusPath:               nginx.StatusPath,
        StatusPort:               nginx.StatusPort,
        StreamPort:               nginx.StreamPort,
    }

    tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum

    return n.t.Write(tc)
}
原文地址:https://www.cnblogs.com/it-worker365/p/15417082.html