01 Rabbit MQ 的初步使用

Rabbit MQ 的初步使用

一、先来说说 Java 代码中的初步集成吧

0. 业务场景模拟

  1. 在默认的虚拟主机(virtual-host)/下,创建一个名为testTopic类型的交换机(exchange),并设置其属性为不持久化(durable)、不自动删除(autoDelete)。

  2. 同时创建了一个名为hello的队列(Queue),并且设置不持久化(durable)、不排外的(exclusive)、不自动删除(autoDelete)。

  3. 将这个Queueexchange绑定(binding)起来。并同时指定一个routingKey。这个key的作用就是让流进Exchange的消息流进指定的Queue。topic类型的exchange使用全名routingKey的话,就相当于点对点的发送接收了(初步认识rabbit mq感觉是这样子,后续继续深入学习时,发现这个观点不对的话,会改正的)。

1. 项目依赖

Maven pom文件里添加坐标。

	<!-- Rabbit MQ 依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

2. 在application.yml文件里配置一些参数

# spring配置
spring:
  rabbitmq:
    host: 10.0.43.27
    port: 5673
    username: guest
    password: guest
    virtual-host: /
    # 这里配置消费者接收消息后,需要进行 ack 确认,这个消息才算被消费掉
    listener:
      simple:
        acknowledge-mode: manual

3. 写配置类

package com.unidata.cloud.logservice.manager.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 
 * @author demo@demo.com
 * @date 2019/11/19 15:21
 */
@Configuration
public class RabbitmqConfig {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello",false,false,false);
    }

    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("test", false, false);
    }

    @Bean
    public Binding bind(Queue queue,TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("demo_key");
    }
}

4. 发送消息

package com.unidata.cloud.logservice.manager.rabbitmq;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author demo@demo.com
 * @date 2019/11/19 17:51
 */
@RestController
@Slf4j
@AllArgsConstructor
public class ProduceController {

    private AmqpTemplate rabbitTemplate;

    @RequestMapping(value = "/send")
    public String sendMsg(String msg){
        rabbitTemplate.convertAndSend("hello", "{"name":"demo01"}");
        return "ok";
    }
}

5. 监听消息

package com.unidata.cloud.logservice.manager.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author demo@demo.com
 * @date 2019/11/19 17:52
 */
@Component
@Slf4j
public class ReceiveListener {

    @Autowired
    RabbitTemplate template;

    @RabbitHandler
    @RabbitListener(queues = "hello")
    public void processMsg(Message message, Channel channel) throws IOException {
        try {
            byte[] body = message.getBody();
            String result = new String(body, StandardCharsets.UTF_8);
            JSONObject jsonObject = JSON.parseObject(result);
            System.out.println(jsonObject);
            // you can do something here.
				
            // 进行消费确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("消息消费成功");
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            log.error(e.getLocalizedMessage());
        }
    }
}

6. 测试

  1. 如图,发送消息

  1. 查看控制台,已经消费成功

7. 初体验总结

上边六个步骤,进行了一个简单的,生产者消息发送,消费者消息消费的过程。

当运行这个项目时,会根据RabbitmqConfig类里的配置信息自动创建出 exchange 和 queue 。

题外:

原文地址:https://www.cnblogs.com/kjgym/p/11900792.html