RabbitMQ 公平分发模式

在上一篇文章的基础上进行改动

创建两个消费者,分别是 ConsumerAa、ConsumerBb

package com.example.demo;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author: DevanYan
 * @create: 2019-05-28 11:46
 */

//消息的消费者
@Component
@RabbitListener(queues = "hello")
public class ConsumerAa {

    @RabbitHandler
    public void use(String message) {
        try {
            Thread.sleep(1000);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("AA消费了一条消息 : " + message);
    }

}
package com.example.demo;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author: DevanYan
 * @create: 2019-05-28 11:46
 */

//消息的消费者
@Component
@RabbitListener(queues = "hello")
public class ConsumerBb {

    @RabbitHandler
    public void use(String message) {
        try {
            Thread.sleep(2000);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("BB消费了一条消息 : " + message);
    }

}

修改消息生产者 Creater

package com.example.demo;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: DevanYan
 * @create: 2019-05-28 11:47
 */

// 消息的生产者
@Component
public class Creater {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void  create(){

        for(int i = 1; i <=10 ;i++){
            String message = "Hello RabbitMQ !";
            message = "[ "+i+" ] "+message;
            amqpTemplate.convertAndSend("hello", message);
            System.out.println("创建了一条消息 : " + message);

            try {
                if(i==10){
                    Thread.sleep(60*5*1000);//睡眠5分钟,让程序有足够的时间处理消息
                }
               } catch (InterruptedException e) { e.printStackTrace(); }

        }


    }

}

运行测试类,进行测试

创建了一条消息 : [ 1 ] Hello RabbitMQ !
创建了一条消息 : [ 2 ] Hello RabbitMQ !
创建了一条消息 : [ 3 ] Hello RabbitMQ !
创建了一条消息 : [ 4 ] Hello RabbitMQ !
创建了一条消息 : [ 5 ] Hello RabbitMQ !
创建了一条消息 : [ 6 ] Hello RabbitMQ !
创建了一条消息 : [ 7 ] Hello RabbitMQ !
创建了一条消息 : [ 8 ] Hello RabbitMQ !
创建了一条消息 : [ 9 ] Hello RabbitMQ !
创建了一条消息 : [ 10 ] Hello RabbitMQ !
AA消费了一条消息 : [ 1 ] Hello RabbitMQ !
BB消费了一条消息 : [ 2 ] Hello RabbitMQ !
AA消费了一条消息 : [ 3 ] Hello RabbitMQ !
AA消费了一条消息 : [ 5 ] Hello RabbitMQ !
BB消费了一条消息 : [ 4 ] Hello RabbitMQ !
AA消费了一条消息 : [ 7 ] Hello RabbitMQ !
AA消费了一条消息 : [ 9 ] Hello RabbitMQ !
BB消费了一条消息 : [ 6 ] Hello RabbitMQ !
BB消费了一条消息 : [ 8 ] Hello RabbitMQ !
BB消费了一条消息 : [ 10 ] Hello RabbitMQ !

记得注意观察   http://localhost:15672

扩展:

查看控制台输出,当时消息没消费完毕时,关掉程序

然后再次启动,这时再看输出,看看会输出什么结果,

其中的原理自己体会……

原文地址:https://www.cnblogs.com/devan/p/10937120.html