Springboot集成RabbitMQ

前面我们已经了解了RabbitMQ的一些基本概念和原理,今天进入实战篇,在springboot框架中集成RabbitMQ,默认已经创建一个Springboot项目。

添加pom依赖

在pom.xml文件中引入以下依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>${springboot.version}</version>
</dependency>

修改application.yml文件

在application.yml文件中添加以下配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: lzm
    password: lzm
    virtual-host: test

address是rabbitmq server的请求地址,5672为其默认的端口号,集群多个地址时可以使用逗号隔开
username:rabbitmq用户名
password: rabbitmq密码
virtual-host: 虚拟主机

以上配置是集成rabbitmq最基本的配置,如果要使用更多的配置,比如配置手动ack,消息发送confirm等属性,可以参考org.springframework.boot.autoconfigure.amqp.RabbitProperties类中的默认配置

声明一个队列

我们写一个配置类,在配置类中声明一个队列,也可以通过访问RabbitMQ的管理页面http://IP:15672手动创建一个队列,此处使用代码去声明一个队列,底层使用RabbitAdmin在Rabbit Server中创建队列。

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author lzm
 * @date 2020/7/22 22:45
 */
@Component
public class RabbitConfig {

    /**
     * 定义一个可持久化的队列
     * @return
     */
    @Bean
    public Queue firstQueue(){
        return new Queue(Constant.MY_FIRST_QUEUE_NAME,true,false,false);
    }
}

上述代码声明了一个可持久化的队列,创建队列时的四个参数解释如下:

name: 队列名称
durable: 为true时表示是可持久化的队列,即使rabbitmq服务重启以后,队列依然是存在的
exclusive: 为true时表示只能被定义该queue的连接使用,一旦这个连接关闭,队列也会跟着删除
autoDelete: 为true时表示当没有消费者再使用queue时,queue自动删除

添加以上代码后,启动springboot项目后就可以在RabbitMQ的管理界面看到我们定义的队列:

生产者

生产者向消息队列中发送消息,我们创建一个service,在service中实现发送数据的逻辑:

import com.lzm.rabbitmq.constant.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;

/**
 * @author lzm
 * @date 2020/7/22 23:00
 */
@Service
public class ProducerService {

    /**
     * 注入rabbit模板
     */
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void produceMessage(){

        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(Constant.MY_FIRST_QUEUE_NAME, "我是消息====>"+i);
        }
    }
}

上述代码中,首先向service中注入了RabbitTemplate,然后调用template的convertAndSend方法,闯入队列名称和消息内容后即可将消息发送到前面创建的队列中。

我们写一个controller用以调用service代码生产消息:

@RestController
@Slf4j
@RequestMapping("/")
public class IndexController {

    @Resource
    private ProducerService producerService;

    @GetMapping(value = "index")
    public String sendMessage(){
        producerService.produceMessage();
        return "消息已发送,请查看消息队列";
    }
}

启动tomcat,访问http://127.0.0.1:79999/index方法后,查看rabbitmq管理界面中队列中的数据如下所示:

可以看到此时队列中共有10条数据,且10条数据都处理ready状态,添加消费者后即可消息处于ready状态的数据,可以点进队列中,查看队列中的详细数据,如下所示:

通过Get Message面板可以查看当前队列中的消息,不仅可以看到消息体,还可以看到消息头,比如Wxchange是默认的交换机,Routing Key与队列名称相同,以及消息的优先级(priority),是否持久化(delivery_model)等各项内容

消费者

前面生产者已经将消息发送到消息队列中,此时需要创建一个消费者对队列中的数据进行消费,创建消费者的代码如下所示:

import com.lzm.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者示例代码
 * @author lzm
 * @date 2020/7/22 23:26
 */
@Component
@Slf4j
public class MyConsumer {

    @RabbitHandler
    @RabbitListener(queues = { Constant.MY_FIRST_QUEUE_NAME})
    public void consumeMessage(String msg, Channel channel, Message message){
        log.debug(msg);
    }
}

启动项目后控制台打印效果:

可以看到此时消费端将队列中的全部消息都消费掉了,此时队列中是没有消息的。

注解解释:

@RabbitHandler 添加到方法上,用来处理特定类型的消息
@RabbitListener 可以添加到方法上,也可以添加到类上,该注解方法中最重要的属性是queues,即当前消费者从哪些队列中消费消息,该注解还有很多属性,比如绑定(bindings)、containerFactory、优先级(priority)等多个属性,可以根据实际需要进行配置

总结

通过以上配置即可在springboot中初步集成Rabbitmq,但是上述的集成只是简单的能够发送消息,但是并不能保证生产端发送的消息消费端一定可以接收到,对于分布式消息队列需要的消息投递的可靠性一点都没有涉及,下一篇将介绍Rabbitmq是如何实现消息的可靠投递的。

原文地址:https://www.cnblogs.com/ybyn/p/13690965.html