go activeMQ的简单封装

虽然个人也不怎么推荐activeMQ, 只是由于项目需要, 所以也做一个简单的整理, 在订阅的时候 ,一般我们的业务都是处理字符串,但有时候AckMode 设置为AckAuto不可以,客服端处理完业务后在发回确认,所以订阅封装了2个方法

utils.go如下:

//Usage:
//
//    //Send
//    if err := utils.NewActiveMQ("localhost:61613").Send("/queue/test-1", "test from 1"); err != nil {
//    fmt.Println("AMQ ERROR:", err)
//
//  //this func will handle the messges get from activeMQ server.
//    handler := func(msg string,err error) { fmt.Println("AMQ MSG:", err, msg) }
//    if err := utils.NewActiveMQ("localhost:61613").Subscribe("/queue/test-1", handler); err != nil {
//      fmt.Println("AMQ ERROR:", err)
//    }
//
package utils
 
import (
    "time"
 
    "github.com/go-stomp/stomp"
)
 
type ActiveMQ struct {
    Addr string
}
 
var options = []func(*stomp.Conn) error{
    //设置读写超时,超时时间为1个小时
    stomp.ConnOpt.HeartBeat(7200*time.Second, 7200*time.Second),
    stomp.ConnOpt.HeartBeatError(360 * time.Second),
}
 
//New activeMQ with addr[eg:localhost:61613] as host address.
func NewActiveMQ(addr string) *ActiveMQ {
    if addr == "" {
        addr = "localhost:61613"
    }
    return &ActiveMQ{addr}
}
 
// Used for health check
func (this *ActiveMQ) Check() error {
    conn, err := this.Connect()
    if err == nil {
        defer conn.Disconnect()
        return nil
    } else {
        return err
    }
}
 
// Connect to activeMQ
func (this *ActiveMQ) Connect() (*stomp.Conn, error) {
    return stomp.Dial("tcp", this.Addr, options...)
}
 
// Send msg to destination
func (this *ActiveMQ) Send(destination string, msg string) error {
    conn, err := this.Connect()
    if err != nil {
        return err
    }
    defer conn.Disconnect()
    return conn.Send(
        destination,  // destination
        "text/plain", // content-type
        []byte(msg))  // body
}
 
// Subscribe Message from destination
// func handler handle msg reveived from destination
func (this *ActiveMQ) Subscribe(destination string, ack stomp.AckMode, handler func(msg *stomp.Message, con *stomp.Conn, err error)) error {
    conn, err := this.Connect()
    if err != nil {
        return err
    }
    //sub, err := conn.Subscribe(destination, stomp.AckAuto)
    sub, err := conn.Subscribe(destination, ack)
    if err != nil {
        return err
    }
    defer conn.Disconnect()
    defer sub.Unsubscribe()
    for {
        m := <-sub.C
        handler(m, conn, m.Err)
    }
    return err
}
 
//
func (this *ActiveMQ) SubscribeAuto(destination string, handler func(msg string, err error)) error {
    conn, err := this.Connect()
    if err != nil {
        return err
    }
    sub, err := conn.Subscribe(destination, stomp.AckAuto)
    if err != nil {
        return err
    }
    defer conn.Disconnect()
    defer sub.Unsubscribe()
    for {
        m := <-sub.C
        handler(string(m.Body), m.Err)
    }
    return err
}

调用就非常简单了:

package main
 
import (
    "fmt"
    "main/utils"
    "strconv"
 
    "github.com/go-stomp/stomp"
)
 
func main() {
    //生产者
    go func() {
        mq := utils.NewActiveMQ("localhost:61613")
        for i := 0; i < 100; i++ {
            mq.Send("main", "demo"+strconv.Itoa(i+1))
        }
    }()
    //消费者
    go func() {
        mq := utils.NewActiveMQ("localhost:61613")
        //mq.SubscribeAuto("main", handler)
        mq.Subscribe("main", stomp.AckClient, handler2)
    }()
    fmt.Println("activeMQ test")
    var s string
    fmt.Scan(&s)
}
func handler(msg string, err error) {
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println("AMQ MSG:", msg)
    }
}
func handler2(msg *stomp.Message, con *stomp.Conn, err error) {
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println("AMQ MSG:", string(msg.Body))
        con.Ack(msg)
    }
}
原文地址:https://www.cnblogs.com/majiang/p/14185683.html