golang使用rabbitmq多个消费者

生产者.

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)

func main()  {

    //链接mq
    conn, err := amqp.Dial("amqp://guest:guest@192.168.2.232:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    //通道
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    //设置数据
    q, err := ch.QueueDeclare(
        "a_test", // name
        true,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    for i:=1;i<=1000;i++{
        body := fmt.Sprintf("{"order_id":%d}",i)
        fmt.Println(body)
        err = ch.Publish(
            "",     // exchange
            q.Name, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
            })
    }



    failOnError(err, "Failed to publish a message")
}



func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

消费者:

package main

import (
    "github.com/streadway/amqp"
    "log"
    "time"
)



func main()  {


    conn, err := amqp.Dial("amqp://guest:guest@192.168.2.232:5672/")
    if err != nil{
        log.Printf("err %s", err)
    }

    defer conn.Close()

    forever := make(chan bool)
    for  i:=1;i<4;i++{
        go func(routineNum int) {
            ch, err := conn.Channel()
            if err != nil{
                log.Printf("err %s", err)
            }
            defer ch.Close()
            q, err := ch.QueueDeclare(
                "a_test", // name
                true,   // durable
                false,   // delete when unused
                false,   // exclusive
                false,   // no-wait
                nil,     // arguments
            )
            if err != nil{
                log.Printf("err %s", err)
            }

            Msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
            )

            if err != nil{
                log.Printf("err %s", err)
            }

            for msg := range Msgs {


                log.Printf("协程 %d    Received a message: %s",routineNum ,msg.Body)
                time.Sleep(5*time.Second)
                msg.Ack(true)


            }

        }(i)
    }

    <-forever






}

返回结果:

2020/04/15 11:11:22 协程 3    Received a message: {"order_id":69}
2020/04/15 11:11:22 协程 1    Received a message: {"order_id":601}
2020/04/15 11:11:22 协程 2    Received a message: {"order_id":75}
2020/04/15 11:11:27 协程 1    Received a message: {"order_id":602}
2020/04/15 11:11:27 协程 3    Received a message: {"order_id":70}
2020/04/15 11:11:27 协程 2    Received a message: {"order_id":76}
2020/04/15 11:11:32 协程 1    Received a message: {"order_id":603}
2020/04/15 11:11:32 协程 3    Received a message: {"order_id":71}
2020/04/15 11:11:32 协程 2    Received a message: {"order_id":77}
原文地址:https://www.cnblogs.com/sunlong88/p/12703987.html