Rabbitmq入门

1. 消息中间件概述

1.1. 什么是消息中间件

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

  • 为什么使用MQ

    在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量

  • 开发中消息队列通常有如下应用场景:

    1、任务异步处理

    将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

    2、应用程序解耦合

    MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

    3、削峰填谷

    如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。

    img

    消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。

    img

    但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

    img

1.2. AMQP 和 JMS

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

1.2.1. AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

1.2.2. JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

1.2.3. AMQP 与 JMS 区别

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富

1.3. 消息队列产品

市场上常见的消息队列有如下:

  • ActiveMQ:基于JMS
  • ZeroMQ:基于C语言开发
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品
  • Kafka:类似MQ的产品;分布式消息系统,高吞吐量

1.4. RabbitMQ

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);

官网对应模式介绍:https://www.rabbitmq.com/getstarted.htmlimage-20200703010251719

2. 安装及配置RabbitMQ

可以直接用docker安装,或者看之前的博客,也有安装的记录。

1571888944991

3. RabbitMQ入门

3.1. 搭建示例工程

3.1.1. 创建工程

往pom.xml文件中添加如下依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

3.2. 编写代码

把创建链接抽取成方法

public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址;默认为 localhost
        connectionFactory.setHost("192.168.1.119");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称;默认为 /
        connectionFactory.setVirtualHost("/wgr");
        //连接用户名;默认为guest
        connectionFactory.setUsername("wgr");
        //连接密码;默认为guest
        connectionFactory.setPassword("1qaz@WSX");

        //创建连接
        return connectionFactory.newConnection();
    }
}

生产者

public class Producer {

    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        /*
        参数一:队列名称
        参数二:是否定义持久化队列
        参数三:是否独占本次连接
        参数四:是否在不使用的时候自动删除队列
        参数五:队列其他参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //要发送的消息
        String message = "hello,小兔子,我来了!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("已发送消息:"+message);
        //释放资源
        channel.close();
        connection.close();
    }

}

发送效果图如下:

image-20200702194149059

消费者:

ublic class Consumer {

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //创建队列:并设置消息处理
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);

        DefaultConsumer s = new DefaultConsumer(channel){

        };


        //监听消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            /*
            consumerTag :消息者标签,在channel.basicConsume时候可以指定
            envelope: 消息包内容,可从中获取消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送)
            properties: 消息属性
            body: 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("接收到的消息:" + new String(body, "UTF-8"));
                System.out.println("");
                System.out.println("================================================================");
                System.out.println("");
            }
        };
        /*
        监听消息
        参数一:队列名称
        参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到回复后会删除消息;设置为false则需要手动确认
         */
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);

        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}
image-20200702194210902
原文地址:https://www.cnblogs.com/dalianpai/p/13228081.html