kubernetes apiserver 源码阅读笔记(-)

version:1.9.6

导航:

  1.寻找入口

  2.构建命令行

  3.创建服务链 CreateServerChain

       4.启动服务 s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)

  

1.命令行入口

  cmd/kube-apiserver/apiserver.go

  命令行解析框架 &cobra 需要先去了解一下:

  https://o-my-chenjian.com/2017/09/20/Using-Cobra-With-Golang/

  

   入口函数:cmd/kube-apiserver/apiserver.go

   通过cmd的Execute启动服务。

 

func main(){
  ... command := app.NewAPIServerCommand() ... if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "error: %v ", err) os.Exit(1) } }

  

2. 构建命令行

      cmd 结构体

	cmd := &cobra.Command{
		Use: "kube-apiserver",
		Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
		RunE: func(cmd *cobra.Command, args []string) error {
			verflag.PrintAndExitIfRequested()
			utilflag.PrintFlags(cmd.Flags())

			// set default options
			completedOptions, err := Complete(s)
			if err != nil {
				return err
			}

			// validate options
			if errs := completedOptions.Validate(); len(errs) != 0 {
				return utilerrors.NewAggregate(errs)
			}

			return Run(completedOptions, stopCh)
		},
	}

  各组件程序都是用 cobra 来管理、解析命令行参数的,main 包下面还有 app 包,app 包才是包含创建 cobra 命令逻辑的地方,所以其实 main 包的逻辑特别简单,主要是调用执行函数就可以了。

  app.NewAPIServerCommand(server.SetupSignalHandler()) 返回*cobra.Command, 执行 command.Execute()最终会调用 Command结构体中定义的Run函数

  上面的代码中RunE是运行并且返回Error的意思
  我们可以看到RunE中返回了Run(completedOptions, stopCh)

3. 创建服务链

 

4. 创建服务

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {

    这里面创建了一个 server ,经过 PrepareRun() 返回 preparedGenericAPIServer 并最终调用其方法 Run()

   

GenericAPIServer 结构体:
    // GenericAPIServer contains state for a Kubernetes cluster api server.
type GenericAPIServer struct {
	.....

	// admissionControl is used to build the RESTStorage that backs an API Group.
	admissionControl admission.Interface

	// "Outputs"
	// Handler holds the handlers being used by this API server
	Handler *APIServerHandler
     。。。。。

	// DiscoveryGroupManager serves /apis
	DiscoveryGroupManager discovery.GroupManager

	// Enable swagger and/or OpenAPI if these configs are non-nil.
	openAPIConfig *openapicommon.Config

	// PostStartHooks are each called after the server has started listening, in a separate go func for each
	// with no guarantee of ordering between them.  The map key is a name used for error reporting.
	// It may kill the process with a panic if it wishes to by returning an error.
	postStartHookLock      sync.Mutex
	postStartHooks         map[string]postStartHookEntry
	postStartHooksCalled   bool
	disabledPostStartHooks sets.String

	preShutdownHookLock    sync.Mutex
	preShutdownHooks       map[string]preShutdownHookEntry
	preShutdownHooksCalled bool

      。。。。。
	// HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
	HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
}

  


// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
	err := s.NonBlockingRun(stopCh)
	if err != nil {
		return err
	}

	<-stopCh

	err = s.RunPreShutdownHooks()
	if err != nil {
		return err
	}

	// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
	s.HandlerChainWaitGroup.Wait()

	return nil
}

  我们看到它又调用了 s.NonBlockingRun(),看方法名就知道是非阻塞运行即里面会创建新的 goroutine 最终运行 http 服务器,提供 http 

       接口给其它 kubernetes 组件调用,也是 kubernetes 集群控制的核心机制。然后到 <-stopCh 这里阻塞,如果这个 channel 被 close,

      这里就会停止阻塞并处理关闭逻辑最后函数执行结束,s.NonBlockingRun()这个函数也传入了 stopCh,同样也是出于类似的考虑,让程序优雅关闭,

  stopCh 最初是 NewAPIServerCommand() 中创建的

stopCh := server.SetupSignalHandler()
很容易看出来这个 channel 跟系统信号量绑定了,即 ctrl + c 或 kill 通知程序关闭的时候会 close 这个 channel ,然后调用 <-stopCh 的地方就会停止阻塞,
做关闭程序需要的一些清理操作实现优雅关闭


var onlyOneSignalHandler = make(chan struct{})

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() <-chan struct{} {
	close(onlyOneSignalHandler) // panics when called twice

	stop := make(chan struct{})
	c := make(chan os.Signal, 2)
	signal.Notify(c, shutdownSignals...)
	go func() {
		<-c
		close(stop)
		<-c
		os.Exit(1) // second signal. Exit directly.
	}()

	return stop
}

  我们再来看看 NonBlockingRun() 这个函数的实现

// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
	...

	// Use an internal stop channel to allow cleanup of the listeners on error.
	internalStopCh := make(chan struct{})

	if s.SecureServingInfo != nil && s.Handler != nil {
		if err := s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh); err != nil {
			close(internalStopCh)
			return err
		}
	}

	...

	return nil
}

  可以看到又调用了 s.SecureServingInfo.Serve() 来启动 http 服务器,继续深入进去

// Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block.
// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
	if s.Listener == nil {
		return nil, fmt.Errorf("listener must not be nil")
	}

	secureServer := &http.Server{
		Addr:           s.Listener.Addr().String(),
		Handler:        handler,
		MaxHeaderBytes: 1 << 20,
		TLSConfig: &tls.Config{
			NameToCertificate: s.SNICerts,
			// Can't use SSLv3 because of POODLE and BEAST
			// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
			// Can't use TLSv1.1 because of RC4 cipher usage
			MinVersion: tls.VersionTLS12,
			// enable HTTP2 for go's 1.7 HTTP Server
			NextProtos: []string{"h2", "http/1.1"},
		},
	}
        .......
	return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
    

  这一步创建了http.Server, 并且调用RunServer

  

// RunServer listens on the given port if listener is not given,
// then spawns a go-routine continuously serving until the stopCh is closed.
// It returns a stoppedCh that is closed when all non-hijacked active requests
// have been processed.
// This function does not block
// TODO: make private when insecure serving is gone from the kube-apiserver
func RunServer(
	server *http.Server,
	ln net.Listener,
	shutDownTimeout time.Duration,
	stopCh <-chan struct{},
) (<-chan struct{}, error) {
	if ln == nil {
		return nil, fmt.Errorf("listener must not be nil")
	}

	// Shutdown server gracefully.
	stoppedCh := make(chan struct{})
	go func() {
		defer close(stoppedCh)
		<-stopCh
		ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
		server.Shutdown(ctx)
		cancel()
	}()

	go func() {
		defer utilruntime.HandleCrash()

		var listener net.Listener
		listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
		if server.TLSConfig != nil {
			listener = tls.NewListener(listener, server.TLSConfig)
		}

		err := server.Serve(listener)

		msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
		select {
		case <-stopCh:
			klog.Info(msg)
		default:
			panic(fmt.Sprintf("%s due to error: %v", msg, err))
		}
	}()

	return stoppedCh, nil
}

      最终看到在后面那个新的 goroutine 中,调用了server.Serve(listener) 来启动 http 服务器,正常启动的情况下会一直阻塞在这里。  

 至此,我们初步把 kube-apiserver 源码的主线理清楚了,具体还有很多细节我们后面再继续深入。要理清思路我们就需要尽量先屏蔽细节,寻找我们想知道的逻辑路线。

听说学习是从模仿开始的: 感谢源作者 https://cloud.tencent.com/developer/article/1326541




没有什么是写一万遍还不会的,如果有那就再写一万遍。
原文地址:https://www.cnblogs.com/waken-captain/p/10509705.html