用Go实现RabbitMQ消息收发

// amqp.Dial accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth.

// amqp://user:pass@hostname:port/vhost

Receiver:

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://ishowfun:123456@dev.corp.wingoht.com:5672/cd")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "topic_message", // name
        "topic",         // type
        true,            // durable
        false,           // auto-deleted
        false,           // internal
        false,           // no-wait
        nil,             // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "test", // name
        true,   // durable
        false,  // delete when unused
        false,   // exclusive
        false,  // no-wait
        nil,    // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "topic_message", "test")
    err = ch.QueueBind(
        "test",       // queue name
        "test",            // routing key
        "topic_message", // exchange
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    msgs, err := ch.Consume(
        "test", // queue
        "",     // consumer
        true,   // auto ack
        false,  // exclusive
        false,  // no local
        false,  // no wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

Sender:

package main

import (
    "github.com/streadway/amqp"
    "log"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://ishowfun:123456@dev.corp.wingoht.com:5672/cd")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "topic_message", // name
        "topic",      // type
        true,         // durable
        false,        // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )

    body := "hello"
    err = ch.Publish(
        "topic_message",     // exchange
        "test", // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    
    log.Printf(" [x] Sent %s", body)
}
原文地址:https://www.cnblogs.com/lucifer1997/p/9447300.html