golang消息队列nsq

golang消息队列nsq

1、NSQ下载链接
http://nsq.io/deployment/installing.html

win下安装

追加I: sq-1.2.0.windows-amd64.go1.12.9in

2. 打开命令窗口,运行:nsqlookupd

3. 打开新的命令窗口,运行:nsqd --lookupd-tcp-address=127.0.0.1:4160 //此处4160是nsqd与lookupd进行tcp连接的端口

4. 打开新的命令窗口,运行: nsqadmin --lookupd-http-address=127.0.0.1:4161 //此处4161是nsqadmin与lookupd进行http连接的端口

  http://127.0.0.1:4171/ 后台查看

package main

import (
    "github.com/nsqio/go-nsq"
    "fmt"
)

var (
    //nsqd的地址,使用了tcp监听的端口
    tcpNsqdAddrr = "127.0.0.1:4150"
)

func main() {
    //初始化配置
    config := nsq.NewConfig()
    for i := 0; i < 100; i++ {
        //创建100个生产者
        tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
        if err != nil {
            fmt.Println(err)
        }
        //主题
        topic := "Insert"
        //主题内容
        tCommand := "new data!"
        //发布消息
        err = tPro.Publish(topic, []byte(tCommand))
        if err != nil {
            fmt.Println(err)
        }
    }

}

运行如上代码

 生产了100个

接下来消费

package main

import (
    "github.com/nsqio/go-nsq"
    "fmt"
    "sync"
    "time"
)

var (
    //nsqd的地址,使用了tcp监听的端口
    tcpNsqdAddrr = "127.0.0.1:4150"
)

//声明一个结构体,实现HandleMessage接口方法(根据文档的要求)
type NsqHandler struct {
    //消息数
    msqCount int64
    //标识ID
    nsqHandlerID string
}

//实现HandleMessage方法
//message是接收到的消息
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
    //没收到一条消息+1
    s.msqCount++
    //打印输出信息和ID
    fmt.Println(s.msqCount,s.nsqHandlerID)
    //打印消息的一些基本信息
    fmt.Printf("msg.Timestamp=%v, msg.nsqaddress=%s,msg.body=%s 
", time.Unix(0  , message.Timestamp).Format("2006-01-02 03:04:05") , message.NSQDAddress, string(message.Body))
    return nil
}

func main() {
     //这个是监听 队列
    //初始化配置
    config := nsq.NewConfig()
    //创造消费者,参数一时订阅的主题,参数二是使用的通道
    com, err := nsq.NewConsumer("Insert", "channel1", config)
    if err != nil {
        fmt.Println(err)
    }
    //添加处理回调
    com.AddHandler(&NsqHandler{nsqHandlerID: "One"})
    //连接对应的nsqd
    err = com.ConnectToNSQD(tcpNsqdAddrr)
    if err != nil {
        fmt.Println(err)
    }

    //只是为了不结束此进程,这里没有意义
    var wg = &sync.WaitGroup{}
    wg.Add(1)
    wg.Wait()
}

这个nsq 问题有点多 建议不要用

原文地址:https://www.cnblogs.com/newmiracle/p/12981033.html