springboot rabbitmq快速入门上手(实用)

1.添加依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
  <version>2.3.11.RELEASE</version>
</dependency>

2.添加一个直连型交换机的配置类  DirectRabbitConfig


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 直连型交换机 * @author 执笔coding * @version 1.0 * @date 2021/6/9 15:03 */ @Configuration public class DirectRabbitConfig { /**队列 起名:TestDirectQueue*/ @Bean public Queue TestDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("TestDirectQueue",true); } /**Direct交换机 起名:TestDirectExchange*/ @Bean DirectExchange TestDirectExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("TestDirectExchange",true,false); } /**绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting*/ @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } @Bean DirectExchange lonelyDirectExchange() { return new DirectExchange("lonelyDirectExchange"); } }

3.新建一个controller用来测试发送消息和接收消息


import cn.ushowtime.admin.service.message.IRabbitmqService;
import cn.ushowtime.core.util.object.JsonUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* @author 执笔coding * @version 1.0 * @date 2021/6/9 15:50 */ @RestController @RequestMapping("/rabbit") public class RabbitmqController { /**使用RabbitTemplate,这提供了接收/发送等等方法*/ @Resource private RabbitTemplate rabbitTemplate; @Resource private IRabbitmqService rabbitmqService; /** * rabbitmq消息发送测试 * @author songmin * @date 2021/6/9 15:07 * @return: java.lang.String */ @GetMapping("/send") public String sendDirectMessage(String msg) { String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageData",msg); map.put("createTime",createTime); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", JsonUtil.map2Json(map)); return "ok"; } /** * 获取rabbitmq消息 * @author songmin * @date 2021/6/9 15:55 * @return: java.lang.String */ @GetMapping("/get") public String getDirectMessage() throws IOException, TimeoutException { return rabbitmqService.getRabbitmq(); } }

4. service和实现

IRabbitmqService
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author 执笔coding
 * @version 1.0
 * @date 2021/6/9 15:36
 */
public interface IRabbitmqService {

    /**
     * 获取消息队列消息
     * @author songmin
     * @date 2021/6/9 15:49
     * @return: java.lang.String
     */
    String getRabbitmq() throws IOException, TimeoutException;

}
RabbitmqServiceImpl
import cn.ushowtime.admin.service.message.IRabbitmqService;
import cn.ushowtime.core.exception.OriginalException;
import cn.ushowtime.core.util.logger.LoggerFactory;
import cn.ushowtime.core.util.logger.ShowLogger;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author 执笔coding
 * @version 1.0
 * @date 2021/6/9 15:39
 */
@Service("rabbitmqService")
public class RabbitmqServiceImpl implements IRabbitmqService {

    @Value("${spring.rabbitmq.host}")
    String host;

    @Value("${spring.rabbitmq.port}")
    Integer port;

    @Value("${spring.rabbitmq.username}")
    String username;

    @Value("${spring.rabbitmq.password}")
    String password;

    private final static String QUEUE_NAME = "TestDirectQueue";

    private static final ShowLogger logger = LoggerFactory.getLogger(RabbitmqServiceImpl.class);


    @Override
    public String getRabbitmq() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置登录账号
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        //链接服务器
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //定义一个队列
        //持久化
        boolean duiable=true;
        //排他队列
        boolean exclusive = false;
        //没有consumer时,队列是否自动删除
        boolean autoDelete=false;
        channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, null);

        String message ="";
        GetResponse resp ;
            resp = channel.basicGet(QUEUE_NAME, true);
            if(resp==null){
                throw new OriginalException("200",QUEUE_NAME+" 队列无消息");
            }else {
                message = new String(resp.getBody(),"utf-8");
                logger.info("取到消息 {}",message);
            }
        return message;
    }

}

5.测试

5.1 发送消息

5.2 接收消息

 

 6. 消息监听自动接收

import cn.ushowtime.core.util.logger.LoggerFactory;
import cn.ushowtime.core.util.logger.ShowLogger;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * rabbitmq消息监听接收类
 * @author 执笔coding
 * @version 1.0
 * @date 2021/6/9 15:12
 */
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {

    private static final ShowLogger logger = LoggerFactory.getLogger(DirectReceiver.class);

    @RabbitHandler
    public void process(Map testMessage) {
        logger.info("DirectReceiver消费者收到消息  : {}" ,testMessage);
    }
}

这个类可以在消息发送之后就立即监听到消息,并获取到。

原文地址:https://www.cnblogs.com/ushowtime/p/14867578.html