golang使用rabbitmq正确姿势

go mod init  github.com/ichunt2019/go-rabbitmq

D:gocodego-rabbitmqutils abbitmq eceiver.go

主要是实现了rabbimq 生产者 消费者

消费者:实现失败尝试机制

package rabbitmq

import (
    //"errors"
    "fmt"
    "github.com/streadway/amqp"
    "log"
)


// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel

// 定义生产者接口
type Producer interface {
    MsgContent() string
}

// 定义生产者接口
type RetryProducer interface {
    MsgContent() string
}

// 定义接收者接口
type Receiver interface {
    Consumer([]byte)    error
    FailAction([]byte)  error
}

// 定义RabbitMQ对象
type RabbitMQ struct {
    connection *amqp.Connection
    Channel *amqp.Channel
    dns string
    QueueName   string            // 队列名称
    RoutingKey  string            // key名称
    ExchangeName string           // 交换机名称
    ExchangeType string           // 交换机类型
    producerList []Producer
    retryProducerList []RetryProducer
    receiverList []Receiver
}

// 定义队列交换机对象
type QueueExchange struct {
    QuName  string           // 队列名称
    RtKey   string           // key值
    ExName  string           // 交换机名称
    ExType  string           // 交换机类型
    Dns     string              //链接地址
}



// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){

    mqConn, err = amqp.Dial(r.dns)
    r.connection = mqConn   // 赋值给RabbitMQ对象

    if err != nil {
        fmt.Printf("关闭mq链接失败  :%s 
", err)
    }

    return
}

// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){

    err = r.connection.Close()
    if err != nil{
        fmt.Printf("关闭mq链接失败  :%s 
", err)
    }
    return
}

// 链接rabbitMQ
func (r *RabbitMQ)MqOpenChannel() (err error){
    mqConn := r.connection
    r.Channel, err = mqConn.Channel()
    //defer mqChan.Close()
    if err != nil {
        fmt.Printf("MQ打开管道失败:%s 
", err)
    }
    return err
}

// 链接rabbitMQ
func (r *RabbitMQ)CloseMqChannel() (err error){
    r.Channel.Close()
    if err != nil {
        fmt.Printf("关闭mq链接失败  :%s 
", err)
    }
    return err
}




// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
    return RabbitMQ{
        QueueName:q.QuName,
        RoutingKey:q.RtKey,
        ExchangeName: q.ExName,
        ExchangeType: q.ExType,
        dns:q.Dns,
    }
}

func (mq *RabbitMQ) sendMsg (body string)  {
    err :=mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s 
", err)
    }

    defer mq.Channel.Close()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s 
", err)
        }
    }


    // 用于检查队列是否存在,已经存在不需要重复声明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
    if err != nil {
        log.Printf("QueueDeclare err :%s 
", err)
    }
    // 绑定任务
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s 
", err)
        }
    }

    if mq.ExchangeName != "" && mq.RoutingKey != ""{
        err = mq.Channel.Publish(
            mq.ExchangeName,     // exchange
            mq.RoutingKey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
            })
    }else{
        err = mq.Channel.Publish(
            "",     // exchange
            mq.QueueName, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
            })
    }

}


func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {
    err :=mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s 
", err)
    }
    defer mq.Channel.Close()

    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s 
", err)
        }
    }

    //原始路由key
    oldRoutingKey := args[0]
    //原始交换机名
    oldExchangeName := args[1]

    table := make(map[string]interface{},3)
    table["x-dead-letter-routing-key"] = oldRoutingKey
    if oldExchangeName != "" {
        table["x-dead-letter-exchange"] = oldExchangeName
    }else{
        mq.ExchangeName = ""
        table["x-dead-letter-exchange"] = ""
    }

    table["x-message-ttl"] = int64(20000)

    //fmt.Printf("%+v",table)
    //fmt.Printf("%+v",mq)
    // 用于检查队列是否存在,已经存在不需要重复声明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
    if err != nil {
        log.Printf("QueueDeclare err :%s 
", err)
    }
    // 绑定任务
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s 
", err)
        }
    }

    header := make(map[string]interface{},1)

    header["retry_nums"] = retry_nums + int32(1)

    var ttl_exchange string
    var ttl_routkey string

    if(mq.ExchangeName != "" ){
        ttl_exchange = mq.ExchangeName
    }else{
        ttl_exchange = ""
    }


    if mq.RoutingKey != "" && mq.ExchangeName != ""{
        ttl_routkey = mq.RoutingKey
    }else{
        ttl_routkey = mq.QueueName
    }

    //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s 
",ttl_exchange,ttl_routkey)
    err = mq.Channel.Publish(
        ttl_exchange,     // exchange
        ttl_routkey, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:header,
        })
    if err != nil {
        fmt.Printf("MQ任务发送失败:%s 
", err)

    }

}


// 监听接收者接收任务 消费者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver,routineNum int) {
    err :=mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s 
", err)
    }
    defer mq.Channel.Close()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s 
", err)
        }
    }


    // 用于检查队列是否存在,已经存在不需要重复声明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
    if err != nil {
        log.Printf("QueueDeclare err :%s 
", err)
    }
    // 绑定任务
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s 
", err)
        }
    }
    // 获取消费通道,确保rabbitMQ一个一个发送消息
    err =  ch.Qos(1, 0, false)
    msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)
    if err != nil {
        log.Printf("Consume err :%s 
", err)
    }
    for msg := range msgList {
        retry_nums,ok := msg.Headers["retry_nums"].(int32)
        if(!ok){
            retry_nums = int32(0)
        }
        // 处理数据
        err := receiver.Consumer(msg.Body)
        if err!=nil {
            //消息处理失败 进入延时尝试机制
            if retry_nums < 3{
                fmt.Println(string(msg.Body))
                fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 
")
                retry_msg(msg.Body,retry_nums,QueueExchange{
                        mq.QueueName,
                        mq.RoutingKey,
                        mq.ExchangeName,
                        mq.ExchangeType,
                        mq.dns,
                    })
            }else{
                //消息失败 入库db
                fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 
")
                receiver.FailAction(msg.Body)
            }
            err = msg.Ack(true)
            if err != nil {
                fmt.Printf("确认消息未完成异常:%s 
", err)
            }
        }else {
            // 确认消息,必须为false
            err = msg.Ack(true)

            if err != nil {
                fmt.Printf("消息消费ack失败 err :%s 
", err)
            }
        }

    }
}

//消息处理失败之后 延时尝试
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
    //原始队列名称 交换机名称
    oldQName := queueExchange.QuName
    oldExchangeName := queueExchange.ExName
    oldRoutingKey := queueExchange.RtKey
    if oldRoutingKey == "" || oldExchangeName == ""{
        oldRoutingKey = oldQName
    }

    if queueExchange.QuName != "" {
        queueExchange.QuName = queueExchange.QuName + "_retry_3";
    }

    if queueExchange.RtKey != "" {
        queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
    }else{
        queueExchange.RtKey = queueExchange.QuName + "_retry_3";
    }

//fmt.Printf("%+v",queueExchange)

    mq := NewMq(queueExchange)
    mq.MqConnect()

    defer func(){
        mq.CloseMqConnect()
    }()
    //fmt.Printf("%+v",queueExchange)
    mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)


}


func Send(queueExchange QueueExchange,msg string){
    mq := NewMq(queueExchange)
    mq.MqConnect()

    defer func(){
        mq.CloseMqConnect()
    }()
    mq.sendMsg(msg)

}

/*
runNums  开启并发执行任务数量
 */
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int){
    mq := NewMq(queueExchange)
    mq.MqConnect()

    defer func(){
        mq.CloseMqConnect()
    }()

    forever := make(chan bool)
    for i:=1;i<=runNums;i++{
        go func(routineNum int) {
            defer mq.Channel.Close()
            // 验证链接是否正常
            mq.MqOpenChannel()
            mq.ListenReceiver(receiver,routineNum)
        }(i)
    }
    <-forever
}


type retryPro struct {
    msgContent   string
}
View Code

D:gocodego-rabbitmqdemo ecv.go

package main

import (
    "fmt"
    "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
    "time"
)

type RecvPro struct {

}

//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
/*
返回值 error 为nil  则表示该消息消费成功
否则消息会进入ttl延时队列  重复尝试消费3次
3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
 */
func (t *RecvPro) Consumer(dataByte []byte) error {
    //time.Sleep(500*time.Microsecond)
    //return errors.New("顶顶顶顶")
    fmt.Println(string(dataByte))
    time.Sleep(1*time.Second)
    //return errors.New("顶顶顶顶")
    return nil
}

//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
 */
func (t *RecvPro) FailAction(dataByte []byte) error {
    fmt.Println(string(dataByte))
    fmt.Println("任务处理失败了,我要进入db日志库了")
    fmt.Println("任务处理失败了,发送钉钉消息通知主人")
    return nil
}



func main() {
    t := &RecvPro{}



    //rabbitmq.Recv(rabbitmq.QueueExchange{
    //    "a_test_0001",
    //    "a_test_0001",
    //    "",
    //    "",
    //    "amqp://guest:guest@192.168.2.232:5672/",
    //},t,5)

    /*
        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
     */
    rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.2.232:5672/",
    },t,3)



}

D:gocodego-rabbitmqdemosend.go

package main

import (
    "fmt"
    _ "fmt"
    "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
)

func main() {


    for i := 1;i<10;i++{
        body := fmt.Sprintf("{"order_id":%d}",i)
        fmt.Println(body)

        /**
            使用默认的交换机
            如果是默认交换机
            type QueueExchange struct {
            QuName  string           // 队列名称
            RtKey   string           // key值
            ExName  string           // 交换机名称
            ExType  string           // 交换机类型
            Dns     string              //链接地址
            }
            如果你喜欢使用默认交换机
            RtKey  此处建议填写成 RtKey 和 QuName 一样的值
         */

        //queueExchange := rabbitmq.QueueExchange{
        //    "a_test_0001",
        //    "a_test_0001",
        //    "",
        //    "",
        //    "amqp://guest:guest@192.168.2.232:5672/",
        //}

        /*
         使用自定义的交换机
         */
        queueExchange := rabbitmq.QueueExchange{
            "a_test_0001",
            "a_test_0001",
            "hello_go",
            "direct",
            "amqp://guest:guest@192.168.2.232:5672/",
        }

        rabbitmq.Send(queueExchange,body)


    }


}

使用说明:

go get github.com/ichunt2019/go-rabbitmq

发送消息

package main

import (
    "fmt"
    _ "fmt"
    "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
)

func main() {


    for i := 1;i<10;i++{
        body := fmt.Sprintf("{"order_id":%d}",i)
        fmt.Println(body)

        /**
            使用默认的交换机
            如果是默认交换机
            type QueueExchange struct {
            QuName  string           // 队列名称
            RtKey   string           // key值
            ExName  string           // 交换机名称
            ExType  string           // 交换机类型
            Dns     string              //链接地址
            }
            如果你喜欢使用默认交换机
            RtKey  此处建议填写成 RtKey 和 QuName 一样的值
         */

        //queueExchange := rabbitmq.QueueExchange{
        //    "a_test_0001",
        //    "a_test_0001",
        //    "",
        //    "",
        //    "amqp://guest:guest@192.168.2.232:5672/",
        //}

        /*
         使用自定义的交换机
         */
        queueExchange := rabbitmq.QueueExchange{
            "a_test_0001",
            "a_test_0001",
            "hello_go",
            "direct",
            "amqp://guest:guest@192.168.2.232:5672/",
        }

        rabbitmq.Send(queueExchange,body)


    }


}

消费消息

package main

import (
    "fmt"
    "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
    "time"
)

type RecvPro struct {

}

//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
/*
返回值 error 为nil  则表示该消息消费成功
否则消息会进入ttl延时队列  重复尝试消费3次
3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
 */
func (t *RecvPro) Consumer(dataByte []byte) error {
    //time.Sleep(500*time.Microsecond)
    //return errors.New("顶顶顶顶")
    fmt.Println(string(dataByte))
    time.Sleep(1*time.Second)
    //return errors.New("顶顶顶顶")
    return nil
}
//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
 */
func (t *RecvPro) FailAction(dataByte []byte) error {
    fmt.Println(string(dataByte))
    fmt.Println("任务处理失败了,我要进入db日志库了")
    fmt.Println("任务处理失败了,发送钉钉消息通知主人")
    return nil
}



func main() {
    t := &RecvPro{}



    //rabbitmq.Recv(rabbitmq.QueueExchange{
    //    "a_test_0001",
    //    "a_test_0001",
    //    "",
    //    "",
    //    "amqp://guest:guest@192.168.2.232:5672/",
    //},t,5)

    /*
        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
     */
    rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.2.232:5672/",
    },t,3)



}

说明:

rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.2.232:5672/",
    },t,3)

第一个参数 QueueExchange说明

// 定义队列交换机对象
type QueueExchange struct {
    QuName  string           // 队列名称
    RtKey   string           // key值
    ExName  string           // 交换机名称
    ExType  string           // 交换机类型
    Dns     string              //链接地址
}

第二个参数 type Receiver interface说明

ConsumerFailAction
拿到消息后,用户可以处理任务,如果消费成功 返回nil即可,如果处理失败,返回一个自定义error即可 由于消息内部自带消息失败尝试3次机制,3次如果失败后就没必要一直存储在mq,所以此处扩展,可以用作消息补偿和告警
// 定义接收者接口
type Receiver interface {
    Consumer([]byte)    error
    FailAction([]byte)  error
}

第三个参数:runNusm

runNusm
消息并发数,同时可以处理多少任务 普通任务 设置为1即可 需要并发的设置成3-5即可
原文地址:https://www.cnblogs.com/sunlong88/p/12717820.html