nsq源码-消息接收和发送完整流程

一、TCP Handler

nsqd里面的Main函数。

//nsqd.go
func (n *NSQD) Main() error {
	//...
	n.waitGroup.Wrap(func() {
			exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
		})
	//...
}

//tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
	//...
	go func() {
			handler.Handle(clientConn)
			wg.Done()
		}()
	//...
}

n.tcpServer实现了TCPHandler接口
在nsqd.New()里面创建的tcpServer

//nsqd.go
func New(opts *Options) (*NSQD, error) {
	//...
	n.tcpServer = &tcpServer{}
	//...
}

看下tcpServer实现的Handle接口里做了什么

func (p *tcpServer) Handle(clientConn net.Conn) {

	//...
	//从socket中读取数据
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	if err != nil {
		p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
		clientConn.Close()
		return
	}
	protocolMagic := string(buf)

	p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
		clientConn.RemoteAddr(), protocolMagic)

	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		//这里是关键,创建了一个protocolV2对象
		prot = &protocolV2{nsqd: p.nsqd}
	default:
		protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
		clientConn.Close()
		p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
			clientConn.RemoteAddr(), protocolMagic)
		return
	}

	p.conns.Store(clientConn.RemoteAddr(), clientConn)

	//开启protocolV2的IOLoop,这是一个客户端连接的“守护”协程
	//接收消息和发送消息给客户端,都在这里面处理了
	err = prot.IOLoop(clientConn)
	if err != nil {
		p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
	}

	p.conns.Delete(clientConn.RemoteAddr())
}
//protocol_v2.go
//因为这里是在Handler里启动的,所以这里其实是为每个客户端都启动了一个Loop
func (p *protocolV2) IOLoop(conn net.Conn) error {
	...
	clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1)
	client := newClientV2(clientID, conn, p.nsqd)
	p.nsqd.AddClient(client.ID, client)

	...
	// messagePump负责从channel的memoryMsgChan和
	//backend.ReadChan()中读取消息并将消息推送给client

	messagePumpStartedChan := make(chan bool)
	go p.messagePump(client, messagePumpStartedChan)
	<-messagePumpStartedChan

	//下面这个for循环负责接收客户端消息,比如消费订阅,以及生产消息等
	//主要逻辑在p.Exec()里
	for {
		...
		//主要逻辑在这个Exec里面
		response, err = p.Exec(client, params)
		...
	}

	//...
}

二、接收消息

上面已经看到处理客户端消息主要在protocolV2.Exec()里
这段代码我觉得很好理解了,直接去protocolV2.PUB()
看客户端生产消息的逻辑,其他的指令先不看

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	if bytes.Equal(params[0], []byte("IDENTIFY")) {
		return p.IDENTIFY(client, params)
	}
	err := enforceTLSPolicy(client, p, params[0])
	if err != nil {
		return nil, err
	}
	switch {
	...
        //这里就是客户端生产消息的指令处理了
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
	...
	case bytes.Equal(params[0], []byte("SUB")):
		return p.SUB(client, params)
	}
	...
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
	var err error
	...
	bodyLen, err := readLen(client.Reader, client.lenSlice)
	...
	//读取消息主体
	messageBody := make([]byte, bodyLen)
	_, err = io.ReadFull(client.Reader, messageBody)
	if err != nil {
		return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
	}

	...
	//将消息丢到topic.PutMessage()
	//PutMessage直接将消息丢到msgChan或者diskqueue了
	//关于topic的部分,参考https://www.cnblogs.com/werben/p/14518283.html
	topic := p.nsqd.GetTopic(topicName)
	msg := NewMessage(topic.GenerateID(), messageBody)
	err = topic.PutMessage(msg)
	if err != nil {
		return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
	}

	client.PublishedMessage(topicName, 1)

	return okBytes, nil
}

三、发送消息

现在来搞清楚,服务器端又在哪里发送消息给consumer?
在这protocolV2.messagePump()

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	var err error
	var memoryMsgChan chan *Message
	var backendMsgChan <-chan []byte
	var subChannel *Channel
	var flusherChan <-chan time.Time
	var sampleRate int32

	//客户端执行Sub的时候,会将Channle丢到这个SubEventChan里
	//可以去看protocolV2.SUB()函数
	subEventChan := client.SubEventChan

	//鉴权Identify对应的chan,只能鉴权一次 
	identifyEventChan := client.IdentifyEventChan

	//flushChan赋值outputBufferTicker,默认是250ms时间间隔Flush一次数据
	outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)

	heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
	heartbeatChan := heartbeatTicker.C
	msgTimeout := client.MsgTimeout
	flushed := true
	close(startedChan)

	for {
		// 检查订阅状态和消息是否可处理状态
		if subChannel == nil || !client.IsReadyForMessages() {
			// the client is not ready to receive messages...
			memoryMsgChan = nil
			backendMsgChan = nil
			flusherChan = nil
			// force flush
			client.writeLock.Lock()
			err = client.Flush()
			client.writeLock.Unlock()
			if err != nil {
				goto exit
			}
			flushed = true
		} else if flushed {
			// last iteration we flushed...
			// do not select on the flusher ticker channel
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = nil
		} else {
			//这个memoryMsgChan是channel将消息存在内存中的地方
			memoryMsgChan = subChannel.memoryMsgChan
			//这个backendMsgChan是channel将消息存在磁盘中的地方
			backendMsgChan = subChannel.backend.ReadChan()

			flusherChan = outputBufferTicker.C
		}
		fmt.Printf("werben subChannel nil: %t
", subChannel == nil)

		select {
		case <-flusherChan:
			//这个flusherChan就是outputBufferTicker
			//250ms时间间隔Flush一次数据
			client.writeLock.Lock()
			err = client.Flush()
			client.writeLock.Unlock()
			if err != nil {
				goto exit
			}
			flushed = true
		case <-client.ReadyStateChan:
		case subChannel = <-subEventChan:
			//客户端Sub的时候,会将channel传到这个subEventChan通道,
			//参考protocolV2.SUB()函数
			subEventChan = nil
		case identifyData := <-identifyEventChan:
			//客户端提交identify时出发,只能提交一次identify,
			//参考函数protocolV2.IDENTIFY()

			//感觉这里就是在收到这个消息时
			//重新启动心跳和flush同步的ticker
			identifyEventChan = nil

			outputBufferTicker.Stop()
			if identifyData.OutputBufferTimeout > 0 {
				outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
			}

			heartbeatTicker.Stop()
			heartbeatChan = nil
			if identifyData.HeartbeatInterval > 0 {
				heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
				heartbeatChan = heartbeatTicker.C
			}

			if identifyData.SampleRate > 0 {
				sampleRate = identifyData.SampleRate
			}

			msgTimeout = identifyData.MsgTimeout
		case <-heartbeatChan:
			//心跳处理
			err = p.Send(client, frameTypeResponse, heartbeatBytes)
			if err != nil {
				goto exit
			}
		case b := <-backendMsgChan:
			...
			//磁盘消息处理
			client.SendingMessage()
			...
			err = p.SendMessage(client, msg)
			...
			flushed = false
		case msg := <-memoryMsgChan:
			//将内存消息发送给客户端
			client.SendingMessage()
			...
			err = p.SendMessage(client, msg)
			...
			flushed = false
		case <-client.ExitChan:
			goto exit
		}
	}
	exit:
		...
		//结束时候关闭心跳和flush同步的ticke
		heartbeatTicker.Stop()
		outputBufferTicker.Stop()
		...
}
原文地址:https://www.cnblogs.com/werben/p/14523875.html