SpringBoot整合RabbitMQ(一)

      在RabbitMQ的环境搭建成功后,创建SpringBoot项目,通过一个简单的案例来详细的说明下RabbitMQ

的生产者消费者的模式。下面结合SpringBoot项目,来具体的说明下这部分的具体应用。

一、pom引入RabbitMQ

         创建项目成功后,我们需要在pom.xml的文件里面来引入rabbitmq的jar,pom.xml文件详细的内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

二、生产者思路

        RabbitMQ是MQ中核心的组件技术栈,生产者消费者的模型中也是非常重要的部分,在RabbitMQ中生产者

的应用程序并不关心队列,生产者的任务只需要把MQ的消息发送到Exchange,而不关心并且也不知道Queue的

存在,至于Queue这部分是需要消费者来进行关心的。所以在操作RabbitMQ上,它的步骤具体总结如下:

  • 先创建连接工厂的对象,也就是ConnectionFactory
  • 然后配置连接MQ的地址,端口,账户以及密码,和虚拟主机这部分
  • 连接工厂接着创建连接对象,也就是Connection
  • 接着使用连接对象来创建信道,也就是Channel
  • 最后通过生产者的模式把MQ的消息发送到Exchange

生产者涉及到的源代码具体如下:

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer
{
    public static void main(String[] args) throws  Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        //配置连接mq的地址信息
        connectionFactory.setHost("101.43.158.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        //连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //通过connection来创建Channel
        Channel channel = connection.createChannel();

        //通过channel来发送具体的数据信息
        String msg = "Hello RabbitMQ";
        for (int i = 0; i < 5; i++) {
            channel.basicPublish("saas", "", null, msg.getBytes());
            System.out.println("发送数据成功:"+i);
        }
        //发送消息成功后,关闭具体的连接
        channel.close();
        connection.close();
    }
}

三、消费者思路

         在消费者中,和生产者前面的代码基本是一致的,但是这部分需要特别强调的是在消费者的设计中,

它不需要关心Exchange,而消费者核心需要关注的是Queue这部分,也就是消费者的应用程序主要是通过

Queue里面读取到MQ的数据,这部分代码具体如下:

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.*;

public class Consumer
{
    //定义exchange
    private static final String EXCHANGE = "saas";
    //定义队列
    private  static  final String queueName="saas";

    public static void main(String[] args) throws  Exception
    {
        try{
            //创建连接工厂
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //配置连接mq的地址信息
            connectionFactory.setHost("101.43.158.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
//        connectionFactory.setVirtualHost("/");

            //连接工厂创建连接
            Connection connection=connectionFactory.newConnection();

            //通过connection来创建Channel
            Channel channel=connection.createChannel();

            //设置exchange类型为fanout
            channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);

        /*
         定义一个队列
         * 一个队列来接收数据后,消费端才可以从队列里面来接收具体的数据
         * param1:队列名称
         * param2:是否持久化
         * param3:队列是否独占此连接
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数
         * */
            channel.queueDeclare(queueName,true,false,true,null);

            channel.queueBind(queueName,EXCHANGE,"");

            //创建一个消费者来消费数据
//        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
            //2.0以后的版本修改为DefaultConsumer
            DefaultConsumer consumer=new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(
                        String consumerTag,
                        com.rabbitmq.client.Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte [] body) throws  java.io.IOException
                {

                    String message=new String(body);
                    System.out.println("接收到的消息为:"+message);
                };
            };
            // 监听队列,从队列中获取数据
            System.out.println("消费者程序启动成功,准备接收生产者的数据:\n");
            channel.basicConsume(queueName,consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

四、程序验证

        完成代码成功后,下来执行具体的程序来进行验证。这部分需要特别的强调下,在执行消费者程序前

不需要单独的再创建exchange和queue,要不就会出现消费者的程序无法启动,这点需要特别的注意,另外

一个就是被操作的账户必须就具备管理员的权限。如上代码执行后,消费者以及生产者输出结果信息如下:

如上,就可以看到消费者接收到生产者发送的数据。程序执行后,可以在RabbitMQ的WEB控制台可以看到具体

的数据,主要是Queue的部分,具体如下:

如上,可以看到控制台中消费者的数据。感谢您的阅读,或许会持续更新RabbitMQ的技术栈的体系文章。

欢迎关注微信公众号“Python自动化测试”
原文地址:https://www.cnblogs.com/weke/p/15811570.html