Golang之mirco框架部分浅析

在实习中使用 micro 框架,但是挺多不懂的,看了部分源码搞懂了一些,还是有一些比较复杂没搞懂。

第一部分:初始化 service 并修改端口

main.go

// waitgroup is a handler wrapper which adds a handler to a sync.WaitGroup
func waitgroup(wg *util.WaitGroupWrapper) server.HandlerWrapper {
	return func(h server.HandlerFunc) server.HandlerFunc {
		return func(ctx context.Context, req server.Request, rsp interface{}) error {
			wg.Add(1)
			defer wg.Done()
			return h(ctx, req, rsp)
		}
	}
}

func main() {
    // ...
    
    var wg util.WaitGroupWrapper
    var service micro.Service
    
    // 这里在下面给出解释
    service = micro.NewService(
        // 这里返回了四个函数闭包,同时也是Option类型,NewService的参数是可变长度的Option数组
    	micro.Name("xxx"),
    	micro.WrapHandler(waitgroup(&wg)),
    	micro.RegisterTTL(30*time.Second),
    	micro.RegisterInterval(10*time.Second),
    )
    
    // 先对 service 初始化
    service.Init()
    // 因为在 go run *.go 的时候回传入参数 --server_address=":8880" 这个时候可以这样将传入的参数取出,之后给webservice使用
    addr := service.Server().Options().Address
    // 将这个的端口赋为默认地址,即:0
    service.Server().Init(server.Address(server.DefaultAddress))
    
    // ...
}

go-micro.go

type Service interface {
	Init(...Option)
	Options() Options
	Client() client.Client
	Server() server.Server
	Run() error
	String() string
}

type Option func(*Options)

// NewService creates and returns a new Service based on the packages within.
func NewService(opts ...Option) Service {
	return newService(opts...)
}

这里定义了一个 function type ,一开始对这个不太熟悉,看了 这篇文章 之后懂了点。定义这个 Option 之后的初始化操作就是将 Option 当做参数,传入到 NewXXX 中,在这些 NewXXX 方法又会遍历传入的 []Option 数组,然后调用这个 Option 在里面套一个 Init() 方法或者直接对变量进行赋值来完成对参数 *Options 的部分变量的初始化。

service.go

func newService(opts ...Option) Service {
    // newOptions 的操作在下面的 options.go 文件中
	options := newOptions(opts...)

    // 包装client,添加一些信息
	options.Client = &clientWrapper{
		options.Client,
		metadata.Metadata{
			HeaderPrefix + "From-Service": options.Server.Options().Name,
		},
	}
    
	return &service{
		opts: options,
	}
}

type service struct {
	opts Options

	once sync.Once
}

func (s *service) Init(opts ...Option) {
	// process options
	for _, o := range opts {
		o(&s.opts)
	}

    // 能保证once只执行一次,无论你是否更换once.Do(xx)这里的方法,这个sync.Once块只会执行一次。
	s.once.Do(func() {
		// save user action
		action := s.opts.Cmd.App().Action
		// 下面注释给出 cmd.App().Action 的初始化
        // func newCmd(opts ...Option) Cmd {
        // 	options := Options{
        // 		Broker:    &broker.DefaultBroker,
        // 		Client:    &client.DefaultClient,
        // 		Registry:  &registry.DefaultRegistry,
        // 		Server:    &server.DefaultServer,
        // 		Selector:  &selector.DefaultSelector,
        // 		Transport: &transport.DefaultTransport,
        
        // 		Brokers:    DefaultBrokers,
        // 		Clients:    DefaultClients,
        // 		Registries: DefaultRegistries,
        // 		Selectors:  DefaultSelectors,
        // 		Servers:    DefaultServers,
        // 		Transports: DefaultTransports,
        // 	}
        
        //  ...
        
        // 	cmd := new(cmd)
        // 	cmd.opts = options
        // 	cmd.app = cli.NewApp()
        //  ...
        // 	cmd.app.Action = func(c *cli.Context) {}  这里
        //  ...

        // 	return cmd


		// set service action
		s.opts.Cmd.App().Action = func(c *cli.Context) {
			// set register interval
			if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
				s.opts.RegisterInterval = i * time.Second
			}

			// user action
			action(c)
		}

		// Initialise the command flags, overriding new service
		_ = s.opts.Cmd.Init(
			cmd.Broker(&s.opts.Broker),
			cmd.Registry(&s.opts.Registry),
			cmd.Transport(&s.opts.Transport),
			cmd.Client(&s.opts.Client),
			cmd.Server(&s.opts.Server),
		)
	})
}

options.go

func newOptions(opts ...Option) Options {
	opt := Options {
		Broker:    broker.DefaultBroker,
		Cmd:       cmd.DefaultCmd,
		Client:    client.DefaultClient,
		Server:    server.DefaultServer,
		Registry:  registry.DefaultRegistry,
		Transport: transport.DefaultTransport,
		Context:   context.Background(),
	}
    
    // 第一个值是下标,第二个值(o)是相当于value,这里是一个Option类型的函数。主要就是对 opt 赋值初始化
	for _, o := range opts {
		o(&opt)
	}

	return opt
}

// 下面四个函数的返回一个闭包,然后在 newOptions 的时候调用完成初始化操作
// Name of the service
func Name(n string) Option {
	return func(o *Options) {
		o.Server.Init(server.Name(n))
	}
}

// RegisterTTL specifies the TTL to use when registering the service
func RegisterTTL(t time.Duration) Option {
	return func(o *Options) {
		o.Server.Init(server.RegisterTTL(t))
	}
}

// RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) Option {
	return func(o *Options) {
		o.RegisterInterval = t
	}
}

// WrapHandler adds a handler Wrapper to a list of options passed into the server
func WrapHandler(w ...server.HandlerWrapper) Option {
	return func(o *Options) {
		var wrappers []server.Option
		
		for _, wrap := range w {
			wrappers = append(wrappers, server.WrapHandler(wrap))
		}
		
		// Init once 
		// 根据这些option初始化server
		o.Server.Init(wrappers...)
	}
}

第二部分:订阅Topic

main.go

func subEvent(ctx context.Context, stats *proto.Event) error {
	golog.Logf("Received event %+v
", stats)
	return nil 
}

func main() {
    // ...
    // subEvent是自定义函数
    micro.RegisterSubscriber("TOPIC", service.Server(), subEvent)
    // ...
}

go-micro.go

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
    // 调用 server.go 的函数
	return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}

server.go

var (
	DefaultServer  Server = newRpcServer()
)

func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber {
	return DefaultServer.NewSubscriber(topic, h, opts...)
}

func Subscribe(s Subscriber) error {
    // 这里的 DefaultServer 是 RpcServer,调用下面 rpc_server.go 的函数
	return DefaultServer.Subscribe(s)
}

rpc_server.go

// 调用 subscriber.go 中的函数
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
	return newSubscriber(topic, sb, opts...)
}

subscriber.go

type handler struct {
	method  reflect.Value
	reqType reflect.Type
	ctxType reflect.Type
}

type subscriber struct {
	topic      string
	rcvr       reflect.Value
	typ        reflect.Type
	subscriber interface{}
	handlers   []*handler
	endpoints  []*registry.Endpoint
	opts       SubscriberOptions
}

func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
    // 利用传进来的 SubcriberOption 初始化,这个和第一部分的类似,略过
	var options SubscriberOptions
	for _, o := range opts {
		o(&options)
	}

	var endpoints []*registry.Endpoint
	var handlers []*handler
    
    // 利用反射去对传进来的 interface{} 操作,我在前面最开始传的是 subEvent 函数
	if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
	    // 因为我传入的是 Func 类型,所以会进入这里
		h := &handler{
		    // 获取方法
			method: reflect.ValueOf(sub),
		}
        
        // 判断参数个数,并分别给对应位置的类型赋值
		switch typ.NumIn() {
		case 1:
			h.reqType = typ.In(0)
		case 2:
			h.ctxType = typ.In(0)
			h.reqType = typ.In(1)
		}

		handlers = append(handlers, h)

		endpoints = append(endpoints, &registry.Endpoint{
			Name:    "Func",
			Request: extractSubValue(typ),
			Metadata: map[string]string{
				"topic":      topic,
				"subscriber": "true",
			},
		})
	} else {
		hdlr := reflect.ValueOf(sub)
		name := reflect.Indirect(hdlr).Type().Name()

		for m := 0; m < typ.NumMethod(); m++ {
			method := typ.Method(m)
			h := &handler{
				method: method.Func,
			}

			switch method.Type.NumIn() {
			case 2:
				h.reqType = method.Type.In(1)
			case 3:
				h.ctxType = method.Type.In(1)
				h.reqType = method.Type.In(2)
			}

			handlers = append(handlers, h)

			endpoints = append(endpoints, &registry.Endpoint{
				Name:    name + "." + method.Name,
				Request: extractSubValue(method.Type),
				Metadata: map[string]string{
					"topic":      topic,
					"subscriber": "true",
				},
			})
		}
	}

	return &subscriber{
		rcvr:       reflect.ValueOf(sub),
		typ:        reflect.TypeOf(sub),
		topic:      topic,
		subscriber: sub,
		handlers:   handlers,
		endpoints:  endpoints,
		opts:       options,
	}
}

rpc_server.go

func (s *rpcServer) Subscribe(sb Subscriber) error {
	sub, ok := sb.(*subscriber)
	if !ok {
		return fmt.Errorf("invalid subscriber: expected *subscriber")
	}
	if len(sub.handlers) == 0 {
		return fmt.Errorf("invalid subscriber: no handler functions")
	}
    
    // 验证是否合法
	if err := validateSubscriber(sb); err != nil {
		return err
	}

	s.Lock()
	defer s.Unlock()
	_, ok = s.subscribers[sub]
	if ok {
		return fmt.Errorf("subscriber %v already exists", s)
	}
	// 置为nil,nil也是值
	s.subscribers[sub] = nil
	return nil
}

第三部分:运行 service

main.go

// 启动订阅服务
if err := service.Run(); err != nil {
	log.Fatal(err)
}

调用 service.goRun() 方法。

service.go

func (s *service) Run() error {
	if err := s.Start(); err != nil {
		return err
	}

	// start reg loop
	ex := make(chan bool)
	
	// 看下面
	go s.run(ex)

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)

	select {
	// wait on kill signal
	case <-ch:
	// wait on context cancel
	case <-s.opts.Context.Done():
	}

	// exit reg loop
	close(ex)

	return s.Stop()
}

func (s *service) Start() error {
    // 遍历在开始前的需要执行的函数并执行
	for _, fn := range s.opts.BeforeStart {
		if err := fn(); err != nil {
			return err
		}
	}

    // 调用 rpc_server 的 Start()
	if err := s.opts.Server.Start(); err != nil {
		return err
	}
    
    // 调用 rpc_server 的 Register()
	if err := s.opts.Server.Register(); err != nil {
		return err
	}

    // 遍历在开始后的需要执行的函数并执行
	for _, fn := range s.opts.AfterStart {
		if err := fn(); err != nil {
			return err
		}
	}

	return nil
}

func (s *service) run(exit chan bool) {
	if s.opts.RegisterInterval <= time.Duration(0) {
		return
	}

	t := time.NewTicker(s.opts.RegisterInterval)

	for {
		select {
		case <-t.C:
			err := s.opts.Server.Register()
			if err != nil {
				log.Log("service run Server.Register error: ", err)
			}
		case <-exit:
			t.Stop()
			return
		}
	}
}

func (s *service) Stop() error {
	var gerr error

	for _, fn := range s.opts.BeforeStop {
		if err := fn(); err != nil {
			gerr = err
		}
	}
    
    // 取消注册
	if err := s.opts.Server.Deregister(); err != nil {
		return err
	}
    
    // 停止
	if err := s.opts.Server.Stop(); err != nil {
		return err
	}

	for _, fn := range s.opts.AfterStop {
		if err := fn(); err != nil {
			gerr = err
		}
	}

	return gerr
}

Start() 方法会调用 server.goStart() 方法

server.go

func (s *rpcServer) Start() error {
	registerDebugHandler(s)
	config := s.Options()

    // 返回一个监听该地址的 listener
	ts, err := config.Transport.Listen(config.Address)
	if err != nil {
		return err
	}

	log.Logf("Listening on %s", ts.Addr())
	s.Lock()
	s.opts.Address = ts.Addr()
	s.Unlock()

    // 启动一条协程接收信息,内部调用了 net 包的 Accept,看下面的 accept()
	go ts.Accept(s.accept)

	go func() {
		// wait for exit
		ch := <-s.exit

		// wait for requests to finish
		if wait(s.opts.Context) {
			s.wg.Wait()
		}

		// close transport listener
		ch <- ts.Close()

		// disconnect the broker
		config.Broker.Disconnect()
	}()

	// TODO: subscribe to cruft
	return config.Broker.Connect()
}

func (s *rpcServer) accept(sock transport.Socket) {
	defer func() {
		// close socket
		sock.Close()

		if r := recover(); r != nil {
			log.Log("panic recovered: ", r)
			log.Log(string(debug.Stack()))
		}
	}()

	for {
		var msg transport.Message
		
		// 接收信息存放到msg中
		if err := sock.Recv(&msg); err != nil {
			return
		}

		// we use this Timeout header to set a server deadline
		to := msg.Header["Timeout"]
		// we use this Content-Type header to identify the codec needed
		ct := msg.Header["Content-Type"]

        // 将原来的 Content-Type 转换为可以被 rpc 使用的 Type
		cf, err := s.newCodec(ct)
		// TODO: needs better error handling
		if err != nil {
			sock.Send(&transport.Message{
				Header: map[string]string{
					"Content-Type": "text/plain",
				},
				Body: []byte(err.Error()),
			})
			return
		}

        // 返回一个新的 codec
		codec := newRpcPlusCodec(&msg, sock, cf)

		// strip our headers
		hdr := make(map[string]string)
		for k, v := range msg.Header {
			hdr[k] = v
		}
		delete(hdr, "Content-Type")
		delete(hdr, "Timeout")

		ctx := metadata.NewContext(context.Background(), hdr)

		// set the timeout if we have it
		if len(to) > 0 {
			if n, err := strconv.ParseUint(to, 10, 64); err == nil {
				ctx, _ = context.WithTimeout(ctx, time.Duration(n))
			}
		}

		// add to wait group
		s.wg.Add(1)
		defer s.wg.Done()

		// TODO: needs better error handling
		// 里面包括了许多方法:
		// server.readRequest() // 从codec中读取请求
		// server.sendResponse() // 最后会将response的信息通过socket发送
		// service.call()
		if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
			log.Logf("Unexpected error serving request, closing socket: %v", err)
			return
		}
	}
}


func (s *rpcServer) Register() error {
	// parse address for host, port
	config := s.Options()
	var advt, host string
	var port int

	// check the advertise address first
	// if it exists then use it, otherwise
	// use the address
	if len(config.Advertise) > 0 {
		advt = config.Advertise
	} else {
		advt = config.Address
	}

	parts := strings.Split(advt, ":")
	if len(parts) > 1 {
		host = strings.Join(parts[:len(parts)-1], ":")
		port, _ = strconv.Atoi(parts[len(parts)-1])
	} else {
		host = parts[0]
	}

	addr, err := addr.Extract(host)
	if err != nil {
		return err
	}

	// register service
	node := &registry.Node{
		Id:       config.Name + "-" + config.Id,
		Address:  addr,
		Port:     port,
		Metadata: config.Metadata,
	}

	node.Metadata["transport"] = config.Transport.String()
	node.Metadata["broker"] = config.Broker.String()
	node.Metadata["server"] = s.String()
	node.Metadata["registry"] = config.Registry.String()

	s.RLock()
	// Maps are ordered randomly, sort the keys for consistency
	// 生成 handlerList
	var handlerList []string
	for n, e := range s.handlers {
		// Only advertise non internal handlers
		if !e.Options().Internal {
			handlerList = append(handlerList, n)
		}
	}
	sort.Strings(handlerList)

	// 生成 subscriberList
	var subscriberList []*subscriber
	for e := range s.subscribers {
		// Only advertise non internal subscribers
		if !e.Options().Internal {
			subscriberList = append(subscriberList, e)
		}
	}
	sort.Slice(subscriberList, func(i, j int) bool {
		return subscriberList[i].topic > subscriberList[j].topic
	})

	var endpoints []*registry.Endpoint
	for _, n := range handlerList {
		endpoints = append(endpoints, s.handlers[n].Endpoints()...)
	}
	for _, e := range subscriberList {
		endpoints = append(endpoints, e.Endpoints()...)
	}
	s.RUnlock()

	service := &registry.Service{
		Name:      config.Name,
		Version:   config.Version,
		Nodes:     []*registry.Node{node},
		Endpoints: endpoints,
	}

	s.Lock()
	registered := s.registered
	s.Unlock()

	if !registered {
		log.Logf("Registering node: %s", node.Id)
	}

	// create registry options
	rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}

	if err := config.Registry.Register(service, rOpts...); err != nil {
		return err
	}

	// already registered? don't need to register subscribers
	if registered {
		return nil
	}

	s.Lock()
	defer s.Unlock()

	s.registered = true

	for sb, _ := range s.subscribers {
		handler := s.createSubHandler(sb, s.opts)
		var opts []broker.SubscribeOption
		if queue := sb.Options().Queue; len(queue) > 0 {
			opts = append(opts, broker.Queue(queue))
		}
		// 订阅
		sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
		if err != nil {
			return err
		}
		// 放入之前设为 nil 的 map 里面
		s.subscribers[sb] = []broker.Subscriber{sub}
	}

	return nil
}

第四部分:初始化WebService

main.go

// 对主页服务
// 自定义的Handler函数 serveHome
func serveHome(w http.ResponseWriter, r *http.Request) {
	log.Println(r.URL)
	if r.URL.Path != "/" {
		http.Error(w, "Not found", http.StatusNotFound)
		return
	}
	log.Println(r.URL.Path[1:])
	http.ServeFile(w, r, "./view/login.html")
}

func main() {
    // ...
    // 初始化操作和前面类似,略过
    webService := web.NewService(
    	web.Name("xxx"),
    	web.RegisterTTL(30*time.Second),
    	web.RegisterInterval(10*time.Second),
    	web.Address(addr),
    	web.Registry(service.Options().Registry),
    )
    // 部署静态资源,看下面
    // http.FileServer()返回一个Handler,将 "" 目录(即当前目录)下的资源部署
    webService.Handle("/view/", http.FileServer(http.Dir("")))
    
    // 主页
    // 自定义的Handler函数 serveHome
    webService.HandleFunc("/", serveHome)
    
    if err := webService.Run(); err != nil {
    	log.Fatal(err)
    }
    // ...
}

service.go

func (s *service) Handle(pattern string, handler http.Handler) {
	var seen bool
	for _, ep := range s.srv.Endpoints {
		if ep.Name == pattern {
			seen = true
			break
		}
	}
	if !seen {
		s.srv.Endpoints = append(s.srv.Endpoints, &registry.Endpoint{
			Name: pattern,
		})
	}
	// 底层调用http包的方法
	s.mux.Handle(pattern, handler)
}

func (s *service) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
	var seen bool
	for _, ep := range s.srv.Endpoints {
		if ep.Name == pattern {
			seen = true
			break
		}
	}
	if !seen {
		s.srv.Endpoints = append(s.srv.Endpoints, &registry.Endpoint{
			Name: pattern,
		})
	}

	s.mux.HandleFunc(pattern, handler)
}

net/http/server.go

type Handler interface {
	ServeHTTP(ResponseWriter, *Request)
}

// HandlerFunc 实现了 ServeHTTP 方法,因此实现了Handler接口
type HandlerFunc func(ResponseWriter, *Request)

// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
	f(w, r)
}

// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
	mux.Handle(pattern, HandlerFunc(handler))
}

func (mux *ServeMux) Handle(pattern string, handler Handler) {
	mux.mu.Lock()
	defer mux.mu.Unlock()

	if pattern == "" {
		panic("http: invalid pattern")
	}
	if handler == nil {
		panic("http: nil handler")
	}
	if _, exist := mux.m[pattern]; exist {
		panic("http: multiple registrations for " + pattern)
	}

	if mux.m == nil {
		mux.m = make(map[string]muxEntry)
	}
	mux.m[pattern] = muxEntry{h: handler, pattern: pattern}

	if pattern[0] != '/' {
		mux.hosts = true
	}
}

附无用Demo

sync.Once

package main

import (
	"fmt"
	"sync"
)

func main() {
	var once sync.Once
	for i := 0; i < 10; i++ {
		once.Do(func() {
			fmt.Println("once :", i)
		})
	}
}

输出:
once : 0

type function

package main

import (
	"fmt"
)

type Fun func(s string)

func A(s string) {
	fmt.Println("a : ", s)
}

func B(s string) {
	fmt.Println("b : ", s)
}

func C(s string) {
	fmt.Println("c : ", s)
}

func D(s string) {
	fmt.Println("d : ", s)
}

func main() {
	f := []Fun{A, B, C, D}
	for k, v := range f {
		fmt.Println(k)
		v("233")
	}
}

输出:
0
a :  233
1
b :  233
2
c :  233
3
d :  233
原文地址:https://www.cnblogs.com/fightfordream/p/9083859.html