RabbitMQ-1 基本概念和实现简单生产消费者

前言

RabbitMQ是基于AMQP协议(Advanced Message Queue Protocol)的消息中间件

什么是消息队列

      消息队列属于进程间通信的一种方式,使用消息队列可以通过异步方式处理数据,借此可以提高系统性能。我们可以把消息当作存放数据的容器,消息的消费者可以从队列中获取数据,进行处理。常见的消息队列有:ActiveMQ,RabbitMQ,Kafka,RocketMQ等。

下图为生产者发送消息到rabbitMq 最终消息被消费者的流程图

一  各角色介绍

1  生产者
Producer: 生产者,就是投递消息的一方。
生产者创建消息,然后发布到 RabbitMQ 中。消息一般可以包含 2 个部分:消息体和标签 (Label)。消息体也可以称之为 payload ,在实际应用中,消 息体一般是一个带有业务逻辑结构 的数据,比如一个 JSON 字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息 , 比如 一个交换器的名称和一个路由键 。 生产者把消息交由 RabbitMQ , RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者(Consumer ) 。

2 消费者
Consumer: 消费者 ,就是接收消息的一方。
消费者连接到 RabbitMQ 服务器,并订阅到队列上 。当消费者消费一 条消息时 ,只是消费消息的消息体(payload )。在消息路由的过程中 ,消息的标签会丢弃 ,存入到队列中的消息只有消息体,消费者也只会消费到消息体 ,也就不知道消息的生产者是谁,当然消费者也不需要知道 。

3  队列
Queue: 队列,是 RabbitMQ 的内部对象,用 于存储消息。

4   交换器,

Exchange: 交换器在上图中我们暂时可以理解成生产者将消息投递到队列中,实际上 这个在 RabbitMQ 中不会发生。真实情况是,生产者将消息发送到 Exchange (交换器),由交换器将消息路由到一个或者多个队列中。如果路由不到,或 许会返回给生产者,或许直接丢弃。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。

RabbitMQ 中的 交换器有四种类型,四种类型分别是 fanout、direct、topic 、headers,不同的类型有着不 同的路由策略,我们后面会详细介绍这四种交换器

5  路由键
RoutingKey : 路由键
      生产者将消息发给交换器 的时候, 一般会指定 一个 RoutingKey ,用 来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey) 联合使用才能最终生效。
在交换器类型和绑定键 (BindingKey) 固定的情况下,生产者可以在发送消息给交换器时, 通过指定 RoutingKey 来决定消息流向哪里。

6   绑定 

 Binding / Binding key

Binding(绑定)RabbitMQ中通过绑定,以路由键作为桥梁将Exchange与Queue关联起来(Exchange—>Routing Key—>Queue),这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了

二 纯界面设置绑定队列和路由键

1  生产者发送消息

1.1 队列绑定


1.1.1 创建队列
在 RabbitMQ 的后台管理界面中创建一个队列 , 指定队列名称。

1.1.2 创建交换器 Exchange

在 RabbitMQ 的后台管理界面中创建一个交换器,指定交换器的名称, 并且指定交换器类型。

1.1.3 绑定队列与交换器

在交换器列表点击对应的交换器 , 进入到绑定界面 , 指定队列名称 queue , 指 定 RoutingKey,通过该 RoutingKey 来绑定该队列与交换器 Exchange 。

之后,在发送消息时, 指定了 Exchange ,及 RoutingKey, 就可以将该消息路由 到该队列 queue 中。

 代码实现简单发送和接收消息

消息生产者

package cn.enjoyedu.exchange.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *类说明:direct类型交换器的消息生产者
 */
public class DirectProducer {

    public final static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args)
            throws IOException, TimeoutException {
        /* 创建连接,连接到RabbitMQ*/
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();

        /*创建信道*/
        Channel channel = connection.createChannel();
        /*创建交换器*/
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);

        /*日志消息级别,作为路由键使用*/
        String[] serverities = {"error","info","warning"};
        for(int i=0;i<3;i++){
            String severity = serverities[i%3];
            String msg = "Hellol,RabbitMq"+(i+1);
            /*发布消息,需要参数:交换器,路由键,其中以日志消息级别为路由键*/
            channel.basicPublish(EXCHANGE_NAME,severity,null,
                    msg.getBytes());
            System.out.println("Sent "+severity+":"+msg);
        }
        channel.close();
        connection.close();

    }

}
View Code

消费者

package cn.enjoyedu.exchange.direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类说明:普通的消费者
 */
public class NormalConsumer {

    public static void main(String[] argv)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打开连接和创建频道,与发送端一样
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                "direct");

        /*声明一个队列*/
        String queueName = "focuserror";
        channel.queueDeclare(queueName, false, false,
                false, null);

        /*绑定,将队列和交换器通过路由键进行绑定*/
        String routekey = "error";/*表示只关注error级别的日志消息*/
        channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routekey);

        System.out.println("waiting for message........");

        /*声明了一个消费者  通知模式*/
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received[" + envelope.getRoutingKey()
                        + "]" + message);
            }
        };
        /*消费者正式开始在指定队列上消费消息,自动确认(ACK)*/
        channel.basicConsume(queueName, true, consumer);


    }

}
View Code
原文地址:https://www.cnblogs.com/hup666/p/13227859.html