golang操作Rabbit

GOLANG操作rabbitmq

简单模式

一个生产者对应一个消费者!!!

生产者

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func main() {
	url := "amqp://guest:guest@localhost:5672/"

	// 创建链接
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 获取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 声明队列,没有则创建
	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)
	/*
		name:queue的名称
		durable:是否持久化
		autoDelete: 是否自动删除(当没有customer时会自动删除)
		exclusive:
			1.是否独占,只有一个消费者监听这个队列
			2.当coon关闭的时候,是否删除队列
		noWait: 是否等待
		args: 额外的参数
	*/
	// 发布消息
	message := amqp.Publishing{
		Body: []byte("hello world"),
	}
	if err := channel.Publish("", "hello", false, false, message); err != nil {
		fmt.Println(err)
	}
	/*
		exchange:交换机的名称,简单模式下使用默认的""
		routerKey: 路由名称, 简单模式下使用和队列名称一样
	*/
}

消费者

func main() {
	url := "amqp://guest:guest@localhost:5672/"

	// 创建链接
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 获取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 声明队列,没有则创建
	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)

	delivery, err := channel.Consume("hello", "", false, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Println(string(message.Body))
	}
}

工作模式

一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"strconv"
)

const (
	url   = "amqp://guest:guest@localhost:5672/"
	queue = "workMode"
)

func Consume(name string) {

	// 创建链接
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 获取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 声明队列,没有则创建
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Printf("name:%s   body:%s
", name, string(message.Body))
	}
}

func Publish() {
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 声明队列,没有则创建
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	for i := 0; i < 10; i++ {
		message := amqp.Publishing{
			Body: []byte("hello world" + strconv.Itoa(i)),
		}
		if err := channel.Publish("", queue, false, false, message); err != nil {
			fmt.Println(err)
		}
	}
}

func main() {
	// 开启两个消费者
	for i := 0; i < 2; i++ {
		go Consume(fmt.Sprintf("name%d", i))
	}
	// 开启一个生产者
	go Publish()
	// 阻塞主goroutine
	<-make(chan int)
}

测试结果

name:name1   body:hello world0
name:name1   body:hello world2
name:name1   body:hello world4
name:name1   body:hello world6
name:name1   body:hello world8
name:name0   body:hello world1
name:name0   body:hello world3
name:name0   body:hello world5
name:name0   body:hello world7
name:name0   body:hello world9

发布订阅模式

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const (
	url          = "amqp://guest:guest@localhost:5672/"
	exchangeName = "TestFanout"
	queue1       = "TestFanoutQueue1"
	queue2       = "TestFanoutQueue2"
)

func Consume(queue string, action string) {

	// 创建链接
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 获取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 注意:这里一定要写上,否则有异常,ioException
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)

	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Printf("name:%s  action:%s body:%s
", queue, action, string(message.Body))
	}
}

func Publish() {
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 创建交换机
	if err := channel.ExchangeDeclare(
		exchangeName,        // name
		amqp.ExchangeFanout, // kind
		false,               // durable
		true,                // autoDelete
		false,               // internal 是否rabbitmq内部使用
		true,                // noWait
		nil,                 // args
	); err != nil {
		log.Fatal(err)
	}

	// 声明队列,没有则创建
	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)

	if err := channel.QueueBind(queue1, "", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	if err := channel.QueueBind(queue2, "", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	message := amqp.Publishing{
		Body: []byte("ExchangeFanout"),
	}
    // 使用FANOUT时,routeingKey设置为空字符串
	if err := channel.Publish(exchangeName, "", false, false, message); err != nil {
		log.Fatal(err)
	}
}

func main() {
	// 开启两个消费者
	go Consume(queue1, "记录日志")
	go Consume(queue2, "保存信息")
	// 开启一个生产者
	go Publish()
	// 阻塞主goroutine
	<-make(chan int)
}

路由模式

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const (
	url             = "amqp://guest:guest@localhost:5672/"
	exchangeName    = "direct"
	errorRouteKey   = "error"
	infoRouteKey    = "info"
	warningRouteKey = "warning"
	queue1          = "directQueue1"
	queue2          = "directQueue2"
)

func Consume(queue string, action string) {

	// 创建链接
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 获取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 注意:这里一定要写上,否则有异常,ioException
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)

	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Printf("name:%s  action:%s body:%s
", queue, action, string(message.Body))
	}
}

func Publish() {
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 创建交换机
	if err := channel.ExchangeDeclare(
		exchangeName,        // name
		amqp.ExchangeDirect, // kind
		false,               // durable
		true,                // autoDelete
		false,               // internal 是否rabbitmq内部使用
		true,                // noWait
		nil,                 // args
	); err != nil {
		log.Fatal(err)
	}

	// 声明队列,没有则创建
	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)

	// 队列1绑定error
	if err := channel.QueueBind(queue1, errorRouteKey, exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	// 队列2绑定error,info,warning
	if err := channel.QueueBind(queue2, errorRouteKey, exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	if err := channel.QueueBind(queue2, infoRouteKey, exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	if err := channel.QueueBind(queue2, warningRouteKey, exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}

	message := amqp.Publishing{
		Body: []byte("ExchangeDirect"),
	}
	// 发送消息指定交换机和路由key
	if err := channel.Publish(exchangeName, infoRouteKey, false, false, message); err != nil {
		log.Fatal(err)
	}
}

func main() {
	// 开启两个消费者
	go Consume(queue1, "记录日志")
	go Consume(queue2, "保存信息")
	// 开启一个生产者
	go Publish()
	// 阻塞主goroutine
	<-make(chan int)
}

此时生产者发送的路由key为info,只有queue2才能收到,如果把路由key改为error,则两个队列都能收到。

通配符模式

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const (
	url          = "amqp://guest:guest@localhost:5672/"
	exchangeName = "topic"
	queue1       = "directQueue1"
	queue2       = "directQueue2"
)

func Consume(queue string, action string) {

	// 创建链接
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 获取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 注意:这里一定要写上,否则有异常,ioException
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)

	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Printf("name:%s  action:%s body:%s
", queue, action, string(message.Body))
	}
}

func Publish() {
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 创建交换机
	if err := channel.ExchangeDeclare(
		exchangeName,       // name
		amqp.ExchangeTopic, // kind
		false,              // durable
		true,               // autoDelete
		false,              // internal 是否rabbitmq内部使用
		true,               // noWait
		nil,                // args
	); err != nil {
		log.Fatal(err)
	}

	// 声明队列,没有则创建
	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)

	// 队列1绑定error
	if err := channel.QueueBind(queue1, "*.*", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	// 队列2绑定error,info,warning
	if err := channel.QueueBind(queue2, "*.error", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	if err := channel.QueueBind(queue2, "#.info", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}

	message := amqp.Publishing{
		Body: []byte("ExchangeDirect"),
	}
	// 发送消息指定交换机和路由key
	if err := channel.Publish(exchangeName, "test.info", false, false, message); err != nil {
		log.Fatal(err)
	}
}

func main() {
	// 开启两个消费者
	go Consume(queue1, "记录日志")
	go Consume(queue2, "保存信息")
	// 开启一个生产者
	go Publish()
	// 阻塞主goroutine
	<-make(chan int)
}

通配符模式是路由模式的进阶版,它能够让一个队列监听动态的路由规则。

原文地址:https://www.cnblogs.com/ivy-blogs/p/14201907.html