RabbitMQ入门

安装与环境配置

SpringBoot整合使用

不使用交换机

使用DirectExchange交换机

交换机和路由不会因为我们的代码删除而删除!

使用TopicExchange交换机

使用FanoutExchange交换机

生产者回调

消费者重回队列

 

安装与环境配置

本文为个人记录RabbitMq的学习笔记。

由于RabbitMQ是基于erlang的,所以,在正式安装RabbitMQ之前,需要先安装一下erlang。

下载安装RabbitMq

1、2两个步骤的安装包,我是在本机Windows10安装的。如果你看到这里可以百度找一下安装博客,其实就是傻瓜式下一步 

链接:https://pan.baidu.com/s/1mUyOdBKcvoW3Lv8f4y8NAQ
提取码:41fm

配置erlang和RabbitMq的环境变量

 

 

在cmd命令界面安装RabbitMq网页版控制台 rabbitmq-plugins enable rabbitmq_management

浏览器 http://localhost:15672/ 进入登录页面,账号密码都是 guest

SpringBoot整合使用

注意:当前我把生产者和消费者放到了两个项目中,是因为刚好搭建了springcloud项目,一步到位。你也可以当到同一个项目中。

需要先引入依赖,如果是多个项目则都需要引入

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
View Code

不使用交换机

package com.dang.springcloud.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class Send1 {

    //创建一个队列
    @Bean(value = "mq1")
    public Queue mq1() {
        return new Queue("mq1");
    }

    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g1")
    public String send() {
        //发送消息,将消息放入mq1的队列中
        amqpTemplate.convertAndSend("mq1", "我是一个字符串");
        System.out.println("1已发送消息");
        return "发送成功!";
    }

}
View Code
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive1 {
    //    监听 mq1消息队列
    @RabbitListener(queues = "mq1")
    public void receive(String msg) {
        System.out.println("1我接受到的消息是:" + msg);
    }
}
View Code

这种写法,发送者直接将消息投递到队列中,消费者从队列获取。思考一个问题,如果我们需要将一条消息投递到多个队列是不是这么写?

package com.dang.springcloud.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class Send2 {

    //创建一个队列
    @Bean(value = "mq2")
    public Queue mq2() {
        return new Queue("mq2");
    }


    //创建一个队列
    @Bean(value = "mq3")
    public Queue mq3() {
        return new Queue("mq3");
    }

    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g2")
    public String send() {
        //发送消息,将消息放入mq1的队列中
        amqpTemplate.convertAndSend("mq2", "我是一个字符串");
        amqpTemplate.convertAndSend("mq3", "我是一个字符串");
        System.out.println("23已发送消息");
        return "发送成功!";
    }

}
View Code 
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive2 {
    //    监听 mq2消息队列
    @RabbitListener(queues = "mq2")
    public void receive(String msg) {
        System.out.println("2我接受到的消息是:" + msg);
    }

    //    监听 mq3消息队列
    @RabbitListener(queues = "mq3")
    public void receive2(String msg) {
        System.out.println("3我接受到的消息是:" + msg);
    }
}
View Code

RabbitMQ在消息发送者和队列之间在抽象一个概念,交换机。消息发送者,不关心消息到底发送给谁,而是将消息投递给交换机,并且指定路由地址,交换机通过路由标记决定消息投递到哪个队列中。

使用DirectExchange交换机

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
public class Send2 {

    //创建一个队列
    @Bean(value = "mq2")
    public Queue mq2() {
        return new Queue("mq2");
    }


    //创建一个队列
    @Bean(value = "mq3")
    public Queue mq3() {
        return new Queue("mq3");
    }


    //Direct交换机
    @Bean(value = "ex1")
    DirectExchange ex1() {
        return new DirectExchange("ex1");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi1")
    Binding binding1() {
        return BindingBuilder.bind(mq2()).to(ex1()).with("k1");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi2")
    Binding binding2() {
        return BindingBuilder.bind(mq3()).to(ex1()).with("k1");
    }



    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g2")
    public String send() {
        amqpTemplate.convertAndSend("ex1", "k1", "我是一条消息");
        System.out.println("消息已投递给交换机ex1");
        return "发送成功!";
    }

}
View Code
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive2 {
    //    监听 mq2消息队列
    @RabbitListener(queues = "mq2")
    public void receive(String msg) {
        System.out.println("2我接受到的消息是:" + msg);
    }

    //    监听 mq3消息队列
    @RabbitListener(queues = "mq3")
    public void receive2(String msg) {
        System.out.println("3我接受到的消息是:" + msg);
    }
}
View Code

这种写法,我们还是需要创建多个队列,然后创建交换机,将队列和交换机绑定,此时我们需要填入一个路由地址。消息发送者直接将消息投递给交换机,并且指定路由。由交换机根据路由查询绑定的队列。上边代码片段,只有唯一的一个交换机,但是从这个交换机中查到了两个不同的路由,将消息投递给绑定的队列中。消费者则不改变写法。

交换机和路由不会因为我们的代码删除而删除!

 

 

 

使用TopicExchange交换机

一下代码片段和上边的十分类似,只是使用了TopicExchange交换机。第二个绑定器使用的是 * 通配符。消息是可以被投递到mq5的。

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
public class Send3 {

    //创建一个队列
    @Bean(value = "mq4")
    public Queue mq4() {
        return new Queue("mq4");
    }


    //创建一个队列
    @Bean(value = "mq5")
    public Queue mq5() {
        return new Queue("mq5");
    }


    //Topic交换机
    @Bean(value = "ex2")
    TopicExchange ex2() {
        return new TopicExchange("ex2");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi3")
    Binding binding1() {
        return BindingBuilder.bind(mq4()).to(ex2()).with("student.age.12");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi4")
    Binding binding2() {
        return BindingBuilder.bind(mq5()).to(ex2()).with("student.age.*");
    }



    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g3")
    public String send() {
        amqpTemplate.convertAndSend("ex2", "student.age.12", "我是一条消息");
        System.out.println("消息已投递给交换机ex2");
        return "发送成功!";
    }

}
View Code
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive3 {
    //    监听 mq4消息队列
    @RabbitListener(queues = "mq4")
    public void receive(String msg) {
        System.out.println("4我接受到的消息是:" + msg);
    }

    //    监听 mq5消息队列
    @RabbitListener(queues = "mq5")
    public void receive2(String msg) {
        System.out.println("5我接受到的消息是:" + msg);
    }
}
View Code

 

接下来我们删除 student.age.* 的路由,换成 student.* 这样消息就不能转发到mq5了。可见 * 只能匹配一个词。

删除掉所有绑定到mq5队列的路由,重新绑定。下面代码片段使用的 # 通配符,# 匹配多个词。

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
public class Send3 {

    //创建一个队列
    @Bean(value = "mq4")
    public Queue mq4() {
        return new Queue("mq4");
    }


    //创建一个队列
    @Bean(value = "mq5")
    public Queue mq5() {
        return new Queue("mq5");
    }


    //Topic交换机
    @Bean(value = "ex2")
    TopicExchange ex2() {
        return new TopicExchange("ex2");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi3")
    Binding binding1() {
        return BindingBuilder.bind(mq4()).to(ex2()).with("student.age.12");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi4")
    Binding binding2() {
        return BindingBuilder.bind(mq5()).to(ex2()).with("student.#");
    }



    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g3")
    public String send() {
        amqpTemplate.convertAndSend("ex2", "student.age.12", "我是一条消息");
        System.out.println("消息已投递给交换机ex2");
        return "发送成功!";
    }

}
View Code

使用FanoutExchange交换机

FanoutExchange没有路由的概念,到像是点对点的直接发送。

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
public class Send4 {

    //创建一个队列
    @Bean(value = "mq6")
    public Queue mq6() {
        return new Queue("mq6");
    }


    //创建一个队列
    @Bean(value = "mq7")
    public Queue mq7() {
        return new Queue("mq7");
    }


    //Fanout交换机
    @Bean(value = "ex3")
    FanoutExchange ex3() {
        return new FanoutExchange("ex3");
    }

    //绑定 将队列和交换机绑定
    @Bean(value = "bi5")
    Binding binding1() {
        return BindingBuilder.bind(mq6()).to(ex3());
    }

    //绑定 将队列和交换机绑定
    @Bean(value = "bi6")
    Binding binding2() {
        return BindingBuilder.bind(mq7()).to(ex3());
    }


    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g4")
    public String send() {
        //此处第二个路由参数必须给 null,否则不能成功将消息投递到队列
        amqpTemplate.convertAndSend("ex3", null,"我是一条消息");
        System.out.println("消息已投递给交换机ex3");
        return "发送成功!";
    }

}
View Code
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive4 {
    //    监听 mq6消息队列
    @RabbitListener(queues = "mq6")
    public void receive(String msg) {
        System.out.println("6我接受到的消息是:" + msg);
    }

    //    监听 mq7消息队列
    @RabbitListener(queues = "mq7")
    public void receive2(String msg) {
        System.out.println("7我接受到的消息是:" + msg);
    }
}
View Code

生产者回调

在Spring配置文件中配置如下

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
View Code

注意看回调函数的写法

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Configuration
public class Send5 {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm-----"+correlationData);
                System.out.println("confirm-----"+b);
                System.out.println("confirm-----"+s);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("returnedMessage-----"+message);
                System.out.println("returnedMessage-----"+s);
                System.out.println("returnedMessage-----"+s1);
                System.out.println("returnedMessage-----"+s2);
            }
        });

        return rabbitTemplate;
    }

    //创建一个队列
    @Bean(value = "mq8")
    public Queue mq8() {
        return new Queue("mq8");
    }


    //Topic交换机
    @Bean(value = "ex4")
    TopicExchange ex4() {
        return new TopicExchange("ex4");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi7")
    Binding binding1() {
        return BindingBuilder.bind(mq8()).to(ex4()).with("HaHaHa");
    }

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping(value = "g5")
    public String g5() {
        rabbitTemplate.convertAndSend("ex4", "HaHaHa", "我是一条消息");
        System.out.println("消息已投递给交换机ex4");
        return "发送成功";
    }

}
View Code

回调结果

交换机不对
confirm-----null
confirm-----false
confirm-----channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'ex' in vhost '/', class-id=60, method-id=40)


找不到路由
returnedMessage-----(Body:'我是一条消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
returnedMessage-----NO_ROUTE
returnedMessage-----ex4
returnedMessage-----HaHaHaww
confirm-----null
confirm-----true
confirm-----null

成功
confirm-----null
confirm-----true
confirm-----null
View Code

消费者重回队列

package com.datang.springcloud.mq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class Receive5 {
    //    监听 mq8消息队列
    @RabbitListener(queues = "mq8")
    public void receive(String msg, Message message, Channel channel) throws IOException {
        try {
            System.out.println("8我接受到的消息是:" + msg);
            int a = 1 / 0;
            // 第一个参数为队列的ID,第二个参数批处理,手动提交比当前ID小的。如果这个队列为正确的,那就可以把之前的提交
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            System.out.println("消费消息确认" + message.getMessageProperties().getConsumerQueue() + ",接收到了回调方法");
        } catch (Exception e) {
            //其实重发也没多大意义,一般都是做个日志,或者其他补偿。
            System.out.println("尝试重发:" + message.getMessageProperties().getConsumerQueue());
            //前两个参数和 basicAck 一样,最后一个为是否重回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);


        }
    }
}
View Code

 

原文地址:https://www.cnblogs.com/zumengjie/p/12359084.html