RabbitMQ快速入门

1.MQ的基本概念

1.1 MQ概述

MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。发送方成为生产者,接收方称为消费者。

1.2MQ的优势和劣势

优势:应用解耦,异步提速,削峰填谷
劣势:系统可用性降低,系统复杂度提高,一致性问题

1.3 常见的MQ产品

目前业界有很多MQ产品,例如RabbitMQ,RocketMQ,ActiveMQ,Kafka,ZeroMQ,MetaMQ等,也有直接使用Redis充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑。

1.4.JMS概念

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API
JMS是JavaEE规范中的一种,类比JDBC
很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有。

2.RabbitMQ简介

它是基于AMQP协议的,即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开发标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件限制。2006年,QMQP规范发布。类比HTTP。2007年Rabbit技术公司基于AMQP标准开发RabbitMQ1.0发布,RabbitMQ采用Erlang语言开发。

  • RabbitMQ相关概念:
  • Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去,常用的类型有,direct(point-to-point),topic(publish-subscribe)and fanout(multicast)
  • Queue:消息队列被送到这里等待consumer取走
  • Binding:exchange和queue之前的虚拟连接,binding中可以包含routing key。Binding信息保存到exchange中的查询表中,用于message的分发依据。

RabbitMQ提供了6种不同的工作模式:简单模式、work queues、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式。

3.应用场景

  以商品订单场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。RabbitMQ就是这样一款消息队列。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。

4.消息队列作用

异步处理。把消息放入消息中间件中,等到需要的时候再去处理。

流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。

日志处理

应用解耦

5.安装RabbitMQ

使用docker进行安装

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

然后访问 http://192.168.1.11:15672/#/ 用户名密码都是guest

6.rabbitmq入门程序

创建maven过程 引入依赖

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
        <!--操作文件的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.8.0</version>
        </dependency>
    </dependencies>

6.1 生产者

package com.gh.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @Author Eric
 * @Date 2021/7/17 15:42
 * @Version 1.0
 * 生产者,发消息
 */
public class Producer {

    //队列名称
    public static final String QUEUE_NAME = "hello";
    //发消息
    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置工厂ip,连接rabbitmq队列
        factory.setHost("192.168.1.11");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化,默认存储在内存中
         * 3.该队列是否只供一个消费者进行消费,是否进行消息的共享,true表示可以多个消费者共享
         * 4.是否自动删除,最后一个消费者断开连接后,是否自动删除,true表示自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //发消息
        String message="hello world";
        /**
         * 发布消息
         * 1.发送到哪个交换机
         * 2.路由的key值是哪个,本次是队列的名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");

    }
}
View Code

6.2 消费者

package com.gh.rabbitmq.one;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Eric
 * @Date 2021/7/17 16:13
 * @Version 1.0
 * 消費者,接收消息
 */
public class Consumer {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        //设置ip 用户名 密码
        factory.setHost("192.168.1.11");
        factory.setUsername("guest");
        factory.setPassword("guest");
        //创建连接
        Connection connection=factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明接收消息
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断了");
        };
        /**
         * 消费者消费消息
         *  1.消费哪个队列
         *  2.消费成功之后是否要自动应答,true代表自动应答
         *  3.消费者未成功消费的回调
         *  4.消费者取消回调
         *
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
View Code

6.3 消息应答

消息在手动应答时不丢失 放回队列中重新消费 设置两个消费者轮询消费 当其中一个宕机时 未消费的消息将由另一个消费者消费

生产者
package com.gh.rabbitmq.three;

import com.gh.rabbitmq.utils.RabbitmqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * @Author Eric
 * @Date 2021/7/17 18:26
 * @Version 1.0
 * 消息在手动应答时不丢失 放回队列中重新消费
 */
public class Task02 {

    public static final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        Scanner scanner=new Scanner(System.in);
        while (scanner.hasNext()){
            String message=scanner.nextLine();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
            System.out.println("生产者发送消息:"+message);
        }
    }
}

消费者01

package com.gh.rabbitmq.three;

import com.gh.rabbitmq.utils.RabbitmqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Author Eric
 * @Date 2021/7/17 18:39
 * @Version 1.0
 */
public class Work03 {
    public static final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间较短");
        //声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 1.消息的tag
             * 2.是否批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("接收到的消息"+new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费消息被中断了");
        };
        //手动应答
        channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

消费者02

package com.gh.rabbitmq.three;

import com.gh.rabbitmq.utils.RabbitmqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Author Eric
 * @Date 2021/7/17 18:39
 * @Version 1.0
 */
public class Work04 {
    public static final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间较长");
        //声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 1.消息的tag
             * 2.是否批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("接收到的消息"+new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费消息被中断了");
        };
        //手动应答
        channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

7.springboot集成RabbitMQ

7.1 安装依赖

在pom.xml引入如下依赖

        <!--rabbitmq消息队列-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>

7.2 配置rabbitMQ信息

在application.properties中配置如下内容

spring.rabbitmq.host=192.168.136.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

7.3 代码实现

(1) 配置mq消息转换器

package com.gh.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置mq消息转换器
 * @Author Eric
 * @Date 2021/6/26 22:47
 * @Version 1.0
 */
@Configuration
public class MQConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
View Code

(2) 配置常量

package com.gh.constant;

/**
 * @Author Eric
 * @Date 2021/6/26 23:06
 * @Version 1.0
 */
public class MqConst {
    /**
     * 预约下单
     */
    public static final String EXCHANGE_DIRECT_ORDER
            = "exchange.direct.order";
    public static final String ROUTING_ORDER = "order";
    //队列
    public static final String QUEUE_ORDER  = "queue.order";
    /**
     * 短信
     */
    public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
    public static final String ROUTING_MSM_ITEM = "msm.item";
    //队列
    public static final String QUEUE_MSM_ITEM  = "queue.msm.item";
}
View Code

(3) 封装RabbitService方法

package com.gh.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Author Eric
 * @Date 2021/6/26 22:44
 * @Version 1.0
 */
@Service
public class RabbitService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  发送消息
     * @param exchange 交换机
     * @param routingKey 路由键
     * @param message 消息
     */
    public boolean sendMessage(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        return true;
    }
}
View Code

(4) 封装mq监听器

package com.gh.receiver;

import com.gh.constant.MqConst;
import com.gh.service.MsmService;
import com.gh.vo.msm.MsmVo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Author Eric
 * @Date 2021/6/26 23:23
 * @Version 1.0
 */
@Component
public class MsmReceiver {

    @Autowired
    private MsmService msmService;
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MqConst.QUEUE_MSM_ITEM, durable = "true"),
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
            key = {MqConst.ROUTING_MSM_ITEM}
    ))
    public void send(MsmVo msmVo, Message message, Channel channel) {
        msmService.send(msmVo);
    }
}
View Code

 (5) 发送短信调用rabbitMQ核心代码

        //发送mq消息 号源更新和短信通知
            OrderMqVo orderMqVo=new OrderMqVo();
            orderMqVo.setScheduleId(scheduleId);
            orderMqVo.setAvailableNumber(availableNumber);
            orderMqVo.setReservedNumber(reservedNumber);
            //短信提示
            MsmVo msmVo=new MsmVo();
            msmVo.setPhone(patient.getPhone());
            String reserveDate =
                    new DateTime(orderInfo.getReserveDate()).toString("yyyy-MM-dd")
                            + (orderInfo.getReserveTime()==0 ? "上午": "下午");
            Map<String,Object> param = new HashMap<String,Object>(){{
                put("title", orderInfo.getHosname()+"|"+orderInfo.getDepname()+"|"+orderInfo.getTitle());
                put("amount", orderInfo.getAmount());
                put("reserveDate", reserveDate);
                put("name", orderInfo.getPatientName());
                put("quitTime", new DateTime(orderInfo.getQuitTime()).toString("yyyy-MM-dd HH:mm"));
            }};
            msmVo.setParam(param);

            orderMqVo.setMsmVo(msmVo);
            rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_ORDER, MqConst.ROUTING_ORDER, orderMqVo);

一点点学习,一丝丝进步。不懈怠,才不会被时代所淘汰!

原文地址:https://www.cnblogs.com/fqh2020/p/14939625.html