RabbitMQ学习(二):RabbitMQ的基本概念

RabbitMQ相关概念

  • RabbitMQ是一个Erlang开发的AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的开源实现。是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在。
  • 主要特征
    1. 可靠性:持久化、传输确认、发布确认等机制来保证可靠性。
    2. 扩展性:支持动态扩展集群中的节点
    3. 高可用:队列可在集群中设置镜像,部分节点出现问题仍然可用
    4. 多协议:AMQP协议、STOMP、MOTT等多种消息中间件协议
    5. 多语言:java、Python、Ruby、PHP、C#、JavaScript、Go、Object-C等
    6. 支持插件:如web管理端。
  • 消息队列有三个基本概念: 发送方、消息队列、消费方。RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和队列之间, 加入了交换器 (Exchange)。这样发消息者和消息队列就没有直接联系,转而变成发消息者把消息发给交换器,交换器根据调度策略再把消息转发给消息队列。消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange。Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。

AMQP概念

Broker:

接收和分发消息的应用,我们在介绍消息中间件的时候所说的消息系统就是Message Broker。

Virtual host:

出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。

Connection:

publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。

Channel:

如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

Exchange:

message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。

  • direct (路由模式):这种类型的交换机的路由规则是根据一个routingKey的标识,交换机通过一个routingKey与队列绑定 ,在生产者生产消息的时候 指定一个routingKey 当绑定的队列的routingKey 与生产者发送的一样 那么交换机会吧这个消息发送给对应的队列。
  • fanout(发布订阅):这种类型的交换机路由规则很简单,只要与他绑定了的队列, 他就会吧消息发送给对应队列(与routingKey没关系)
  • topic(主题模式):这种类型的交换机路由规则也是和routingKey有关 只不过 topic他可以根据:星,#( 星号代表过滤一单词,#代表过滤后面所有单词, 用.隔开)来识别routingKey 我打个比方 假设 我绑定的routingKey 有队列A和B A的routingKey是:星.user B的routingKey是: #.user那么我生产一条消息routingKey 为: error.user 那么此时 2个队列都能接受到, 如果改为 topic.error.user 那么这时候 只有B能接受到了
  • headers(RPC):这个类型的交换机很少用到,他的路由规则 与routingKey无关 而是通过判断header参数来识别的, 基本上没有应用场景,因为上面的三种类型已经能应付了。

Queue:

消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。

Binding:

exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

RabbitMQ几种应用模式

在这里插入图片描述

    // 获取MQ的连接  下面代码获取连接都是使用的这个工具类
    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置连接地址
        factory.setHost("192.168.100.150");
        // 3.设置用户名称
        factory.setUsername("admin");
        // 4.设置用户密码
        factory.setPassword("admin");
        // 5.设置amqp协议端口号
        factory.setPort(5672);
        // 6.设置VirtualHost地址
        factory.setVirtualHost("adminDemo");
        Connection connection = factory.newConnection();
        return connection;
    }
  • 简单模式
    在这里插入图片描述
    • 消息产生消息,将消息放入队列
    • 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。
    • Demo演示
    private static String QUEUE_NAME = "demo_hello";
      //获取连接
        Connection connection = MQConnectionUtils.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        // 创建一个队列:队列名称,是否持久化,是否自动删除队列,是否排外
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 1; i <= 25; i++) {
            // 创建 msg
            String msg = "生成 ---" + i;
            // 生产者发送消息者     MessageProperties.PERSISTENT_TEXT_PLAIN 设置消息的持久化(消息没有接收到也不会丢失)
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
        }
        // 关闭通道和连接
        channel.close();
        connection.close();

在这里插入图片描述

    private static String QUEUE_NAME = "demo_hello";
     	 //获取连接
        Connection connection = MQConnectionUtils.newConnection();
       	 //创建通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // 监听获取消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
            }
        };
        // 设置应答模式 如果为true情况下 表示为自动应答模式 false 表示为手动应答
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

在这里插入图片描述

  • work queues(工作模式)
    在这里插入图片描述
    • 多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每个消费
      者都收到所有的消息并处理。
    • 这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
    • Demo演示
      工作队列只需要将上面的消费者多复制几分就可以了。
      如果发现消息分发不均匀可以设置
        /**
         * 公平队列原理:队列服务器向消费者发送消息的时候,消费者采用手动应答模式,
         * 队列服务器必须要收到消费者发送ack结果通知,才会继续发送一下一个消息
         * 此处设置一次只消费1个
         */
        channel.basicQos(1);

默认情况下,rabbitmq开启了消息的自动应答。此时,一旦rabbitmq将消息分发给了消费者,就会将消息从内存中删除。这种情况下,如果正在执行的消费者被“杀死”或“崩溃”,就会丢失正在处理的消息。 如果想要确保消息不丢失,我们需要设置消息应答方式为手动应答。设置为手工应答后,消费者接受并处理完一个消息后,会发送应答给rabbitmq,rabbitmq收到应答后,会将该条消息从内存中删除。如果一个消费者在处理消息的过程中“崩溃”,rabbitmq没有收到应答,那么”崩溃“前正在处理的这条消息会重新被分发到别的消费者。

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                // 手动应答 模式 告诉给队列服务器 已经处理成功
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 4.设置应答模式 如果为true情况下 表示为自动应答模式 false 表示为手动应答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
  • publish/subscribe(发布订阅)
    在这里插入图片描述
    • 1个生产者,多个消费者。每一个消费者都有自己的一个队列。生产者没有将消息直接发送到队列,而是发送到了交换机。每个队列都要绑定到交换机。生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
    • X(Exchanges)接收生产者发送的消息。知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。
    • 这种模式不需要RouteKey
    • Demo演示
//生产者
	private static final String DESTINATION_NAME = "rabbitMq_fanout";
	// 1. 建立mq连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.生产者绑定交换机 参数1:交换机名称 参数2:交换机类型
		channel.exchangeDeclare(DESTINATION_NAME, "fanout");
		// 4.创建消息
		String msg = "rabbitMq_fanout";
		System.out.println("生产者投递消息:" + msg);
		// 5.发送消息
		channel.basicPublish(DESTINATION_NAME, "",null , msg.getBytes());
		// 6.关闭通道 和连接
		channel.close();
		connection.close();
//消费者
 // 交换机名称
    private static final String DESTINATION_NAME = "rabbitMq_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立mq连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        System.out.println("短信消费者启动");
        FanoutConsumer.smsConsumer(channel);
        System.out.println("邮件消费启动");
        FanoutConsumer.emailConsumer(channel);
    }
    private static final String SMS_QUEUE = "Sms";
    public static void smsConsumer(Channel channel) throws IOException {
        // 3.消费声明队列
        channel.queueDeclare(SMS_QUEUE, false, false, false, null);
        // 4.消费者队列绑定交换机
        channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "");
        // 5.消费监听消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取生产消息:" + msg);
            }
        };
        channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
    }
    private static final String EMAIL_QUEUE = "Email";
    public static void emailConsumer(Channel channel) throws IOException {
        // 3.消费声明队列
        channel.queueDeclare(EMAIL_QUEUE, false, false, false, null);
        // 4.消费者队列绑定交换机
        channel.queueBind(EMAIL_QUEUE, DESTINATION_NAME, "");
        // 5.消费监听消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取生产消息:" + msg);
            }
        };
        //自动应答
        channel.basicConsume(EMAIL_QUEUE, true, defaultConsumer);
    }
  • routing(路由模式)
    在这里插入图片描述
    • 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
    • 任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue
    • 一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。
    • 这种模式下不需要将Exchange进行任何绑定(binding)操作。
    • 消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
    • 如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
    • Demo演示
//生产者
	// 交换机名称
	private static final String DESTINATION_NAME = "rabbitMq_direct";
		// 1. 建立mq连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
		channel.exchangeDeclare(DESTINATION_NAME, "direct");
		//这个是路由键的名称,方便测试
		String s1="sms";
		// 4.创建消息
		String msg = "rabbitMq_direct---:" +s1;
		System.out.println("生产者投递消息:" + msg);
		// 5.发送消息  routingKey:email
		channel.basicPublish(DESTINATION_NAME, s1, null, msg.getBytes());
		// 6.关闭通道 和连接
		channel.close();
		connection.close();
//消费者
   // 交换机名称
    private static final String DESTINATION_NAME = "rabbitMq_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MQConnectionUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        System.out.println("短信消费者启动");
        DirectConsumer.smsConsumer(channel);
        System.out.println("邮件消费者启动");
        DirectConsumer.emailConsumer(channel);
    }
    private static final String SMS_QUEUE = "Sms_msg";
    public static void smsConsumer( Channel channel) throws IOException {
        // 3.消费声明队列
        channel.queueDeclare(SMS_QUEUE, false, false, false, null);
        // 4.消费者队列绑定交换机 绑定路由键
        channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "sms");
        // 5.消费监听消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取生产消息--:" + msg);
            }
        };
        channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
    }
    
    private static final String EMAIL_QUEUE = "Email_msg";
    public static void emailConsumer( Channel channel) throws IOException {
        // 3.消费声明队列
        channel.queueDeclare(EMAIL_QUEUE, false, false, false, null);
        // 4.消费者队列绑定交换机 绑定路由键,可以设置多个
        channel.queueBind(EMAIL_QUEUE, DESTINATION_NAME, "email");
        // 5.消费监听消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取生产消息-----:" + msg);
            }
        };
        channel.basicConsume(EMAIL_QUEUE, true, defaultConsumer);
    }

  • topic(主题模式)
    在这里插入图片描述
    • Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“ * ”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
    • 任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
    • 这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
    • 这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
    • 在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
    • “#”表示0个或若干个关键字,“ * ”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
    • 同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
    • Demo演示
// 交换机名称   生产者
	private static final String DESTINATION_NAME = "rabbitMq_topic";
		// 1. 建立mq连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
		channel.exchangeDeclare(DESTINATION_NAME, "topic");
		//routingKey
		String s1="log.sms.test.demo";
		// 4.创建消息
		String msg = "rabbitMq_msg_topic:"+s1 ;
		System.out.println("生产者投递消息:" + msg);
		// 5.发送消息  routingKey:email
		channel.basicPublish(DESTINATION_NAME, s1, null, msg.getBytes());
		// 6.关闭通道 和连接
		channel.close();
		connection.close();

 // 交换机名称
    private static final String DESTINATION_NAME = "rabbitMq_topic";
    public static void main(String[] args) throws Exception {
        // 1. 建立mq连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        System.out.println("短信消费者启动");
        TopicConsumer.smsConsumer(channel);
        System.out.println("邮件消费者启动");
        TopicConsumer.maileConsumer(channel);
    }
    private static final String SMS_QUEUE = "topic_sms";
    public static void smsConsumer(Channel channel) throws IOException {
        channel.queueDeclare(SMS_QUEUE, false, false, false, null);
        channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "log.sms");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取生产消息----:" + msg);
            }
        };
        channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
    }
    private static final String MAILE_QUEUE = "topic_email";
    public static void maileConsumer(Channel channel) throws IOException {
        // 3.消费声明队列
        channel.queueDeclare(MAILE_QUEUE, false, false, false, null);
        // *只要前缀相同都能收到
		//channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "log.*");
        //可以匹配后面所有的词
        channel.queueBind(MAILE_QUEUE, DESTINATION_NAME, "log.#");
        // 5.消费监听消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取生产消息--------:" + msg);
            }
        };
        channel.basicConsume(MAILE_QUEUE, true, defaultConsumer);
    }


  • RPC
    在这里插入图片描述
    这个模式听说用的很少,我也没有去了解这个模式。以后如果遇到会再来补充的。
原文地址:https://www.cnblogs.com/yangk1996/p/12663602.html