RabbitMQ

应用场景: 1.异步处理 : 同步阻塞的(会造成等待), 异步是非阻塞的(不会等待), 批量数据,就可以采用异步处理.

      2.系统解耦 : 多个系统之间, 不需要直接交互, 通过消息进行业务流转.

      3.流量削峰 : 高负载请求/任务缓冲处理.

消息队列中增加了交换器(Exchange):

    1.Direct Exchange 直连交换机, 根据路由键完全匹配进行路由消息队列;

    2.Topic Exchange 通配符交换机, #匹配多个单词, *匹配一个单词, 用.隔开的称为一个单词:

    3. Fanout Exchange 广播交换机, 投递到所有绑定的队列, 不需要规则.

    4. Headers Exchange 基于消息内容中的header属性进行匹配.

依赖pom.xml

        <!--RabbitMQ依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

属性配置application.properties

spring.rabbitmq.host=10.10.32.140
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#开启发送确认
spring.rabbitmq.publisher-confirms=true
#开启发送失败退回
spring.rabbitmq.publisher-returns=true
#开启ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

第一类, 直连交换机directExchage

发送者MessageSender

@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void send(String exchage, String routingKey) {
        String msg = "你好现在是 " + new Date();
        System.out.println("send content = " + msg);

        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);

        //发送消息
        this.rabbitTemplate.convertAndSend(exchage, routingKey, msg);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {

    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {

    }
}

消费者MessageReceiver

@Component
@RabbitListener(queues = "queue1")
public class MessageReceiver {

    public void process(String msg, Channel channel, Message message) throws IOException {

        try {
            Thread.sleep(3000);
            System.out.println("睡眠3s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            System.out.println("receiver fail");
        }

    }
}

配置类RabbitConfig

@Configuration
public class RabbitConfig {

    /**
     * 定义一个交换器 exchage: DirectExchage 直连交换机, 精确匹配
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        //创建一个直连交换器, 他就是在rabbitmq服务器上创建这么一个交换器
        return new DirectExchange("exchage1");
    }

    /**
     * 创建一个队列, 合规队列是用来存放exchage路由过来的消息
     * @return
     */
    @Bean
    public Queue Queuq1() {
        return new Queue("queue1", true);
    }

    /**
     * 建立起关系, 交换机 + 队列 绑定起来
     * @return
     */
    @Bean
    public Binding bindingDirectExchange(Queue queuq1, DirectExchange directExchange) {
        return BindingBuilder.bind(queuq1).to(directExchange).with("routingkey1");
    }


}

Controller访问

@RestController
public class MessageController {

    @Autowired
    private MessageSender helloSender;

    /**
     * 正常发送消息
     * @return
     */
    @RequestMapping("/boot/send")
    public String send () {
        helloSender.send("exchage1", "routingkey1");
        return "success";
    }
    
}

第二类,广播交换器 FanoutExchange, 不需要路由匹配

配置类RabbitConfig

  /**
     * 创建一个FanoutExchange交换器, 不需要路由匹配
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("exchange2");
    }

    /**
     * 创建队列2
     * @return
     */
    public Queue queue2() {
        return new Queue("queue2", true);
    }

    /**
     * 创建队列3
     * @return
     */
    public Queue queue3() {
        return new Queue("queue3", true);
    }

    /**
     * 把队列2和FanoutExchage交换机绑定
     * @param queue2
     * @param fanoutExchange
     * @return
     */
    public Binding bindingFanoutExchage(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }

    /**
     * 把队列3和FanoutExchage交换机绑定
     * @param queue3
     * @param fanoutExchange
     * @return
     */
    public Binding bindingFanoutExchage2(Queue queue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue3).to(fanoutExchange);
    }

消息发送者, 与第一种一样

@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void send(String exchage, String routingKey) {
        String msg = "你好现在是 " + new Date();
        System.out.println("send content = " + msg);

        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);

        //发送消息
        this.rabbitTemplate.convertAndSend(exchage, routingKey, msg);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {

    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {

    }
}

消息接收者2

@Component
@RabbitListener(queues = "queue2")
public class MessageReceiver {

    public void process(String msg, Channel channel, Message message) throws IOException {

        try {
            Thread.sleep(3000);
            System.out.println("睡眠3s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            System.out.println("receiver fail");
        }

    }
}

消息接收者3

@Component
@RabbitListener(queues = "queue3")
public class MessageReceiver {

    public void process(String msg, Channel channel, Message message) throws IOException {

        try {
            Thread.sleep(3000);
            System.out.println("睡眠3s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            System.out.println("receiver fail");
        }

    }
}

Controller访问

@RestController
public class MessageController {

    @Autowired
    private MessageSender helloSender;

    /**
     * 正常发送消息
     * @return
     */
    @RequestMapping("/boot/send")
    public String send () {
        helloSender.send("exchage2", "");
        return "success";
    }
    
}

 

第三类,Topic Exchange通配符交换机, #匹配多个单词, *匹配一个单词, 用.隔开的称为一个单词

配置类RabbitConfig

/**
     * 创建交换器 TopicExchange, 模糊匹配
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("exchange3");
    }

    @Bean
    public Queue queue4() {
        return new Queue("queue4", true);
    }

    @Bean
    public Queue queue5() {
        return new Queue("queue5", true);
    }

    /**
     * 和queue4建立联系
     * @param queue4
     * @param topicExchange
     * @return
     */
    public Binding bindingTopicExchange(Queue queue4, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue4).to(topicExchange).with("#.k4.*");
    }

    /**
     * 和queue5建立联系
     * @param queue5
     * @param topicExchange
     * @return
     */
    public Binding bindingTopicExchange2(Queue queue5, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue5).to(topicExchange).with("#.K5.*");
    }

消息发送者, 与第一种一样

@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void send(String exchage, String routingKey) {
        String msg = "你好现在是 " + new Date();
        System.out.println("send content = " + msg);

        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);

        //发送消息
        this.rabbitTemplate.convertAndSend(exchage, routingKey, msg);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {

    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {

    }
}

消息接收者4

@Component
@RabbitListener(queues = "queue4")
public class MessageReceiver {

    public void process(String msg, Channel channel, Message message) throws IOException {

        try {
            Thread.sleep(3000);
            System.out.println("睡眠3s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            System.out.println("receiver fail");
        }

    }
}

消息接收者5

@Component
@RabbitListener(queues = "queue5")
public class MessageReceiver {

    public void process(String msg, Channel channel, Message message) throws IOException {

        try {
            Thread.sleep(3000);
            System.out.println("睡眠3s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            System.out.println("receiver fail");
        }

    }
}

Controller访问

@RestController
public class MessageController {

    @Autowired
    private MessageSender helloSender;

    /**
     * 正常发送消息
     * @return
     */
    @RequestMapping("/boot/send")
    public String send () {
        //helloSender.send("exchage2", "xy.k4.z");
    helloSender.send("exchage2", "xy.h.k4.z");
return "success"; } }

总结:

   RabbitMQ应用十分广泛, 程序员必备.

原文地址:https://www.cnblogs.com/goujh/p/10933544.html