lookup.go

package nsqd

import (
    "bytes"
    "encoding/json"
    "net"
    "os"
    "strconv"
    "time"

    "github.com/nsqio/go-nsq"
    "github.com/nsqio/nsq/internal/version"
)

func connectCallback(n *NSQD, hostname string, syncTopicChan chan *lookupPeer) func(*lookupPeer) {
    return func(lp *lookupPeer) {
        ci := make(map[string]interface{})
        ci["version"] = version.Binary
        ci["tcp_port"] = n.RealTCPAddr().Port
        ci["http_port"] = n.RealHTTPAddr().Port
        ci["hostname"] = hostname
        ci["broadcast_address"] = n.getOpts().BroadcastAddress

        cmd, err := nsq.Identify(ci)
        if err != nil {
            lp.Close()
            return
        }
        resp, err := lp.Command(cmd)
        if err != nil {
            n.logf("LOOKUPD(%s): ERROR %s - %s", lp, cmd, err)
        } else if bytes.Equal(resp, []byte("E_INVALID")) {
            n.logf("LOOKUPD(%s): lookupd returned %s", lp, resp)
        } else {
            err = json.Unmarshal(resp, &lp.Info)
            if err != nil {
                n.logf("LOOKUPD(%s): ERROR parsing response - %s", lp, resp)
            } else {
                n.logf("LOOKUPD(%s): peer info %+v", lp, lp.Info)
            }
        }

        go func() {
            syncTopicChan <- lp
        }()
    }
}

func (n *NSQD) lookupLoop() {
    var lookupPeers []*lookupPeer
    var lookupAddrs []string
    syncTopicChan := make(chan *lookupPeer)
    connect := true

    hostname, err := os.Hostname()
    if err != nil {
        n.logf("FATAL: failed to get hostname - %s", err)
        os.Exit(1)
    }

    // for announcements, lookupd determines the host automatically
    ticker := time.Tick(15 * time.Second)
    for {
        if connect {
            for _, host := range n.getOpts().NSQLookupdTCPAddresses {
                if in(host, lookupAddrs) {
                    continue
                }
                n.logf("LOOKUP(%s): adding peer", host)
                lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.getOpts().Logger,
                    connectCallback(n, hostname, syncTopicChan))
                lookupPeer.Command(nil) // start the connection
                lookupPeers = append(lookupPeers, lookupPeer)
                lookupAddrs = append(lookupAddrs, host)
            }
            n.lookupPeers.Store(lookupPeers)
            connect = false
        }

        select {
        case <-ticker:
            // send a heartbeat and read a response (read detects closed conns)
            for _, lookupPeer := range lookupPeers {
                n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer)
                cmd := nsq.Ping()
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
                }
            }
        case val := <-n.notifyChan:
            var cmd *nsq.Command
            var branch string

            switch val.(type) {
            case *Channel:
                // notify all nsqlookupds that a new channel exists, or that it's removed
                branch = "channel"
                channel := val.(*Channel)
                if channel.Exiting() == true {
                    cmd = nsq.UnRegister(channel.topicName, channel.name)
                } else {
                    cmd = nsq.Register(channel.topicName, channel.name)
                }
            case *Topic:
                // notify all nsqlookupds that a new topic exists, or that it's removed
                branch = "topic"
                topic := val.(*Topic)
                if topic.Exiting() == true {
                    cmd = nsq.UnRegister(topic.name, "")
                } else {
                    cmd = nsq.Register(topic.name, "")
                }
            }

            for _, lookupPeer := range lookupPeers {
                n.logf("LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
                }
            }
        case lookupPeer := <-syncTopicChan:
            var commands []*nsq.Command
            // build all the commands first so we exit the lock(s) as fast as possible
            n.RLock()
            for _, topic := range n.topicMap {
                topic.RLock()
                if len(topic.channelMap) == 0 {
                    commands = append(commands, nsq.Register(topic.name, ""))
                } else {
                    for _, channel := range topic.channelMap {
                        commands = append(commands, nsq.Register(channel.topicName, channel.name))
                    }
                }
                topic.RUnlock()
            }
            n.RUnlock()

            for _, cmd := range commands {
                n.logf("LOOKUPD(%s): %s", lookupPeer, cmd)
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
                    break
                }
            }
        case <-n.optsNotificationChan:
            var tmpPeers []*lookupPeer
            var tmpAddrs []string
            for _, lp := range lookupPeers {
                if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
                    tmpPeers = append(tmpPeers, lp)
                    tmpAddrs = append(tmpAddrs, lp.addr)
                    continue
                }
                n.logf("LOOKUP(%s): removing peer", lp)
                lp.Close()
            }
            lookupPeers = tmpPeers
            lookupAddrs = tmpAddrs
            connect = true
        case <-n.exitChan:
            goto exit
        }
    }

exit:
    n.logf("LOOKUP: closing")
}

func in(s string, lst []string) bool {
    for _, v := range lst {
        if s == v {
            return true
        }
    }
    return false
}

func (n *NSQD) lookupdHTTPAddrs() []string {
    var lookupHTTPAddrs []string
    lookupPeers := n.lookupPeers.Load()
    if lookupPeers == nil {
        return nil
    }
    for _, lp := range lookupPeers.([]*lookupPeer) {
        if len(lp.Info.BroadcastAddress) <= 0 {
            continue
        }
        addr := net.JoinHostPort(lp.Info.BroadcastAddress, strconv.Itoa(lp.Info.HTTPPort))
        lookupHTTPAddrs = append(lookupHTTPAddrs, addr)
    }
    return lookupHTTPAddrs
}

原文地址:https://www.cnblogs.com/zhangboyu/p/7457331.html