RabbitMQ 入门

官网:https://www.rabbitmq.com/

参考:https://blog.csdn.net/hellozpc/article/details/81436980#52_204

一.消息中间件的作用

异步处理

应用解耦

流量削峰

日志处理

二.rabbitmq 安装与配置

下载:https://www.rabbitmq.com/download.html

在cmd 窗口输入:

rabbitmq-service start
rabbitmq-plugins enable rabbitmq_management

浏览器输入:localhost:15672   guest/guest

三. Java 操作 rabbitmq

  (1) simple 简单队列

添加依赖

     <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.4.1</version>
        </dependency>

 定义连接工具类:

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("testhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}
 
View Code

定义生产者:

public class Send {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}
View Code

定义消费者:

public class Recv {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}
View Code

(2)work queue  工作队列

 

一个生产者,多个消费者

消费者01:

public class Consumer01 {

    private static final  String queue = "my_queue";

     public static void main(String []args) throws Exception{

        Connection connection = MqConnection.getConnect();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queue,false,false,false,null);

       // channel.basicQos(1);


        QueueingConsumer consumer = new QueueingConsumer(channel);

       // channel.basicConsume(queue,false,consumer);
         channel.basicConsume(queue,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());

            System.out.println("Consum-01-[Receive]:"+msg);

            Thread.sleep(10);

            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }
    }
}
View Code

消费者02:

public class Consumer02 {

    private static final  String queue = "my_queue";

     public static void main(String []args) throws Exception{

        Connection connection = MqConnection.getConnect();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queue,false,false,false,null);

        //channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);

        //channel.basicConsume(queue,false,consumer);
         channel.basicConsume(queue,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());

            System.out.println("Consum-02-[Receive]:"+msg);

            Thread.sleep(500);

            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
View Code

生产者:

public class Producer {
    private static final  String queue = "my_queue";

    public static void main(String []args) throws  Exception{

        Connection connection = MqConnection.getConnect();

        Channel channel = connection.createChannel();

        channel.queueDeclare(queue,false,false,false,null);


        for(int i=0;i<50;i++){
            String msg = "第"+i+"条消息。。。。。";
            channel.basicPublish("",queue,null,msg.getBytes());
            System.out.println("[发送"+i+"消息]:"+msg);
            Thread.sleep(10);
        }

        channel.close();
        connection.close();

    }
}
View Code

结果:

1.消费者01 与消费者02 获取的内容不同,同一个消息只能被一个消费者获取

2.消费者01 与消费者02 获取的消息的相等的

不合理: 消费者01 处理的时间更短,可以获取更多的消息

机质:

轮询分发(round-robin):使用任务队列可以并行的工作。默认的情况下,rabbitmq 将诸葛发送到序列中的消费者,不考虑每个任务的时间,且是提前一次性分配。每个

消费者获取相等数量的消息,这种方式分发消息机制成为 Round-Robin 轮询

虽然上面的分配方法可行,但如果某个任务时间长,别的消费者比较闲

解决方法: 

使用basicQos(prefetchCount = 1) ,限制 RabbitMQ 只发送不超过 1条的消息给同一个消费者。当消息处理完毕后才发送第二条。

能者多劳:

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);

(3)publish/subscribe  发布/订阅

1.一个生产者,多个消费者

2.每个消费者都有自己的一个队列

3.每个队列都要绑定到交换机

4.生产者将消息发送到交换机

5.当消息发送到没有队列绑定的交换机时,消息丢失。因为交换机没有存储消息的能力,下拍戏只能存在队列种。

生产者:向交换机发送消息

public class Product {

    private static final String EXCHANGE_Name = "text_exchange_fanout";

    public static void main(String []args) throws Exception{

        Connection connection = MqConnection.getConnect();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_Name,"fanout");

        String msg = "hello,world";

        channel.basicPublish(EXCHANGE_Name,"",null,msg.getBytes());

        System.out.println("[Send]:"+msg);

        channel.close();

        connection.close();

    }

}
View Code

消费者01:

public class Consumer01 {
    
    private static final String EXCHANGE_Name = "text_exchange_fanout";
    private static final String QUEUE_Name = "my_queue01";

    public static void main(String []args) throws Exception{

        Connection connection = MqConnection.getConnect();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_Name,false,false,false,null);

        channel.queueBind(QUEUE_Name,EXCHANGE_Name,"");

        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume(QUEUE_Name,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("[Recive01]:"+msg);
            Thread.sleep(10);

        }
    }
}

消费者02:

public class Consumer02 {
    private static final String EXCHANGE_Name = "text_exchange_fanout";
    private static final String QUEUE_Name = "my_queue02";

    public static void main(String []args) throws Exception{

        Connection connection = MqConnection.getConnect();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_Name,false,false,false,null);

        channel.queueBind(QUEUE_Name,EXCHANGE_Name,"");



        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume(QUEUE_Name,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("[Recive02]:"+msg);
            Thread.sleep(100);

        }
    }
}

结果:当生产者发送一条消息时,多个消费者可以获取到消息。一个消费者队列可以有多个消费者实例,只要其中一个消费者实例会消费到消息。  

(4)routing 路由选择 

 

生产者:

public class Producer {

    private static final String EXCHANGE_NAME = "test_exchange_direct";

     public static void main(String []args) throws Exception{

        Connection connection = MqConnection.getConnect();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        String msg = "hello,你哈";

        channel.basicPublish(EXCHANGE_NAME,"update",null,msg.getBytes());
        System.out.println("[send]:"+msg);

        channel.close();
        connection.close();
    }
}
View Code

消费者01:

 1 public class Consumer01 {
 2     private static final String EXCHANGE_NAME = "test_exchange_direct";
 3     private static final String Queue_name = "queue_01";
 4 
 5      public static void main(String []args) throws Exception{
 6         Connection connection = MqConnection.getConnect();
 7         Channel channel = connection.createChannel();
 8 
 9         channel.queueDeclare(Queue_name,false,false,false,null);
10 
11         channel.queueBind(Queue_name,EXCHANGE_NAME,"select");
12         channel.queueBind(Queue_name,EXCHANGE_NAME,"delete");
13 
14         channel.basicQos(1);
15 
16         QueueingConsumer consumer = new QueueingConsumer(channel);
17 
18          channel.basicConsume(Queue_name,true,consumer);
19 
20          while(true){
21              QueueingConsumer.Delivery delivery = consumer.nextDelivery();
22              String msg = new String(delivery.getBody());
23              System.out.println("[queue_01]:"+msg);
24              Thread.sleep(10);
25 
26          }
27     }
28 }
View Code

消费者02:

 1 public class Consumer02 {
 2     private static final String EXCHANGE_NAME = "test_exchange_direct";
 3     private static final String Queue_name = "queue_02";
 4 
 5      public static void main(String []args) throws Exception{
 6         Connection connection = MqConnection.getConnect();
 7         Channel channel = connection.createChannel();
 8 
 9         channel.queueDeclare(Queue_name,false,false,false,null);
10 
11         channel.queueBind(Queue_name,EXCHANGE_NAME,"select");
12         channel.queueBind(Queue_name,EXCHANGE_NAME,"update");
13 
14         channel.basicQos(1);
15 
16         QueueingConsumer consumer = new QueueingConsumer(channel);
17 
18          channel.basicConsume(Queue_name,true,consumer);
19 
20          while(true){
21              QueueingConsumer.Delivery delivery = consumer.nextDelivery();
22              String msg = new String(delivery.getBody());
23              System.out.println("[queue_02]:"+msg);
24              Thread.sleep(10);
25 
26          }
27     }
28 }
View Code

结果:生产者产生的消息会附带key, 消费者也会附带key,当生产者的key == 消费者的key 才能让消费者获得消息。

 

(5)Topics  主题

 

同一个消息被多个消费者获取。

消息生产者:

 1 public class Send {
 2 
 3     private final  static String EXCHANGE_NAME = "test_exchange_topic";
 4 
 5     public static void main(String []args) throws Exception{
 6 
 7         Connection connection = MqConnection.getConnect();
 8         Channel channel = connection.createChannel();
 9 
10         channel.exchangeDeclare(EXCHANGE_NAME,"topic");
11 
12         String msg = "Hello,World";
13 
14         channel.basicPublish(EXCHANGE_NAME,"emp.update.del",null,msg.getBytes());
15         System.out.println("[Send]:"+msg);
16 
17         channel.close();
18         connection.close();
19     }
20 
21 }
View Code

消费者01:

 1 public class Rec01 {
 2     private final  static String EXCHANGE_NAME = "test_exchange_topic";
 3     private final  static String QUEUE_NAME = "test_queue_topic01";
 4 
 5 
 6     public static void main(String []args) throws Exception{
 7         Connection connection = MqConnection.getConnect();
 8         Channel channel = connection.createChannel();
 9 
10         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
11 
12         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"emp.*");
13 
14         channel.basicQos(1);
15 
16         QueueingConsumer consumer = new QueueingConsumer(channel);
17         channel.basicConsume(QUEUE_NAME,false,consumer);
18 
19         while(true){
20             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
21             String msg = new String(delivery.getBody());
22             System.out.println("[rec]01:"+msg);
23             Thread.sleep(100);
24             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
25         }
26     }
27 }
View Code

消费者02:

 1 public class Rec02 {
 2     private final  static String EXCHANGE_NAME = "test_exchange_topic";
 3     private final  static String QUEUE_NAME = "test_queue_topic02";
 4 
 5 
 6     public static void main(String []args) throws Exception{
 7         Connection connection = MqConnection.getConnect();
 8         Channel channel = connection.createChannel();
 9 
10         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
11 
12         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"emp.#");
13 
14         channel.basicQos(1);
15 
16         QueueingConsumer consumer = new QueueingConsumer(channel);
17         channel.basicConsume(QUEUE_NAME,false,consumer);
18 
19         while(true){
20             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
21             String msg = new String(delivery.getBody());
22             System.out.println("[rec]02:"+msg);
23             Thread.sleep(500);
24             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
25         }
26     }
27 }
View Code

* :一个,#一个的或者多个

原文地址:https://www.cnblogs.com/bytecodebuffer/p/11366278.html