nsq源码-topic

topic的入口在哪里:GetTopic()

GetTopic如果存在则直接返回,不存在则NewTopic()

个人觉得Topic里面有两个重要的变量和一个函数,搞清楚这三个东西就差不多了

  1. memoryMsgChan: 这是存放消息的内存,就是一个通道,通道的大小MemQueueSize,
    默认配置是10000,也就是如果堆积的消息超过10000就会使用磁盘了

  2. backend :就是diskqueue,这个就是磁盘存储消息的地方了,这个diskqueue一定要搞懂,因为后面channel也会用到这个queue,关于这个diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html

  3. messagePump : 这是topic的一个“守护”协程,看源码里的英文注释, messagePump selects over the in-memory and backend queue and writes messages to every channel for this topic,它将topic收到的消息,分发给每个channel。

func (t *Topic) messagePump() {
	var msg *Message
	var buf []byte
	var err error
	var chans []*Channel
	var memoryMsgChan chan *Message
	var backendChan <-chan []byte

	// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
	//这里就是要等到startChan完成后才能往下走,
	for {
		select {
		case <-t.channelUpdateChan:
			continue
		case <-t.pauseChan:
			continue
		case <-t.exitChan:
			goto exit
		case <-t.startChan:
		//也就是要等到topic执行完GetChannel()之后才会接着往下走
		}
		break
	}
	t.RLock()
	//将所有channel通道放在chans中
	for _, c := range t.channelMap {
		chans = append(chans, c)
	}
	t.RUnlock()
	if len(chans) > 0 && !t.IsPaused() {
		memoryMsgChan = t.memoryMsgChan
		//backendChan就是backend暴露给外部的readChan
		//参考: https://www.cnblogs.com/werben/p/14517781.html
		backendChan = t.backend.ReadChan()
	}

	// main message loop
	//这里是守护协程的主体了,也就是这个for会一直跑
	for {
		select {
		case msg = <-memoryMsgChan:
			//如果topic有收到新消息
		case buf = <-backendChan:
			//如果消息是从diskqueue里来的,还要解码反序列化成msg
			msg, err = decodeMessage(buf)
			if err != nil {
				t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
		case <-t.channelUpdateChan:
			//如果有新的channel通道
			chans = chans[:0]
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.pauseChan:
			//如果channel通道暂停
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.exitChan:
			goto exit
		}

		//遍历每一个channel通道,将消息投递过去
		for i, channel := range chans {
			chanMsg := msg
			// copy the message because each channel
			// needs a unique instance but...
			// fastpath to avoid copy if its the first channel
			// (the topic already created the first copy)
			if i > 0 {
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
			if chanMsg.deferred != 0 {
				//如果是延时消息则将延时消息丢给channel
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
			//将消息则将延时消息丢给channel
			err := channel.PutMessage(chanMsg)
			if err != nil {
				t.nsqd.logf(LOG_ERROR,
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}

exit:
	t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
原文地址:https://www.cnblogs.com/werben/p/14518283.html