RabbitMQ和Springboot集成RabbitMQ知识点

有关RabbitMQ的知识点

1.linux安装rabbitmq

rpm-ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force 
rpm-ivh erlang-22.1.8-1.el7.x86_64.rpm --force
然后安装rabbitmq
rpm -ivh rabbitmg-server-3.8.1-1.el7.noarch.rpm
主节点∶ hostnamectl --transient --static set-hostname mq1
从节点∶ hostnamectl --transient --static set-hostname mq2
主从节点都执行∶ vi/etc/hosts 追加192.168.1.2 mq1 192.168.1.3 mq2
主从都执行∶ chkconfig rabbitmq-server on
主从都执行∶ rabbitmq-plugins enable rabbitmq_management
主从都执行∶ vim /etc/security/limits.conf 追加rabbitmq-nofile 65536
主节点执行∶ scp /var/lib/rabbitmq/.erlang.cookie 从节点用户名@IP∶/var/lib/rabbitmq/
主从都退出终端重新登录,使主机名更改生效。
主节点执行∶ rabbitmqctl stop_app 
rabbitmqctl join_cluster --ram rabbit@mq2
主从都执行∶ /sbin/service rabbitmq-server start
主节点执行
rabbitmqctl add user admin admin
rabbitmqctl set_user tags admin administrator 
rabbitmqctl add_vhost /myHost
rabbitmqctl set_permissions -p /myHost admin "*" "*" ".*" 

2.Springboot整合RabbitMQ

(1).pom.xml引用RabbitMQ包

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

如果需要强制覆盖版本,则在<properties>加入以下代码,然后在引用amqp的地方将版本设置为${spring-boot-starter-amqp}

<spring-boot-starter-amqp>2.0.6.RELEASE</spring-boot-starter-amqp>

  (2).创建配置类

package com.controler;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    private static final String DIRECT_CHANGE_NAME = "CHX_CHANGE";
    private static final String DIRECT_QUEUE_NAME = "CHX_QUEUE";

    @Bean
    public Queue DirectQueue() {
        return new Queue(DIRECT_QUEUE_NAME, true);
    }

    @Bean
    DirectExchange DirectExchange() {
        return new DirectExchange(DIRECT_CHANGE_NAME, true, false);
    }

    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("routingkey");
    }
}

  (3).监听RabbitMQ

package com.controler;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_NAME)
public class DirectReceiver {
    @RabbitHandler
    public void process(Channel channel, Message msg) throws IOException {
        try {
            String tmp = new String(msg.getBody());
            System.out.println("消费者收到消息  : " + tmp);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

  需要注意的是,这个finally体里的方法要加上,否则有异常时会循环消费死信。这里也可以在application.yml里进行配置。

rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /chxhost
    listener:
        default-requeue-rejected: true #意思是,消息被拒后(即未消费),重新(true)放入队列
        retry:
          enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)

  (4).以上是单机版,如果需要配置集群,可用以下代码,即只更换配置类即可。

package com.chx.util;

import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    private String hosts = "127.0.0.1:5672,127.0.0.2:5672";

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(hosts);
        connectionFactory.setUsername("");
        connectionFactory.setPassword("");
        connectionFactory.setVirtualHost("");
    }
}

  (5).JAVA直接调用RabbitMQ API的代码,更多信息可查看RabbitMQ的web管理页ip:15672/api查看

@scheduled(cron = "* 0/2 * * * ?")
    public void startMonitor() throws CLientProtocolException, IOException {
        String rabbitmqMsg = getRabbitmqMsg("/api/queues", username.password, host, port);
        List<JSONObject> parseArray = JSONArray.parseArray(rabbitmqMsg, JSONObject.class);
        if (parseArray == null || parseArray.size() == 0) {
            return;
        }
        for (int i = 0; i < parseArray.size(); i++) {
            //获取队列里的消息数量
            BigDecimal messages = parseArray.get(i).getBigDecimal("messages");
            //获取虚拟机名
            String vhost = parseArray.get(i).getString("vhost");
            //获取队列名
            String name = parseArray.get(i).getString("name");
            System.out.println(vhost + name + "消息数量" + messages);
        }
    }

    /**
     * @param tailUrl  尾缀
     * @param userName 较大权限的用户名(可查看队列信息)
     * @param password 较大权限的用户的密码
     * @param hostIp   访问的主机IP
     * @param port     访问的rabbitmq端口(一定是web管理端口,默认15672,不是连接端口5672)
     * @return
     * @throws CLientProtocolException
     * @throws IOException
     */
    public String getRabbitmqMsg(String tailUrl, String userName, String password, String hostIp, int port)
            throws CLientProtocolException, IOException {
        HostPost host = new HostPost(hostIp, port);
        HttpGet httGet = new HttpGet(tailUrl);
        //开始基于Bisc模式认证用户名密码。basic由于是明文传输所以不安全,但是和tls/ssl协同仍然可以接受
        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(new AuthScope(hostIp.port),
                new UsernamePasswordCredentials(userName, password));
        DefaultHttpClient client = new DefaultHttpClient();
        client.setCredentialsProvider(credentialsProvider);
        HttpResponse execute = client.execute(host, httGet);
        HttpEntity entity = execute.getEntity();
        return EntityUtils.toString(entity);
    }

3.常见问题解决办法

  (1).springboot连接rabbitmq发生socket closed异常。

1.检查配置文件端口是否是5672,因为rabbitmq连接的不是web管理端,很多时候粗心出现这个错误。只有访问Web API时才设定15672.

2.不管是集群还是单机模式,一般都会配置下hosts文件的主机名。检查下host是否被篡改导致的。

3.在web管理页检查配置的用户是否有操作当前队列的权限。如未绑定虚拟机、交换机权限。设置如图所示,进入15672端口的页面管理。

以下图片从网络寻找。

4.扩展知识点

1.如何保证消息队列的顺序性:

首先要保证生产者发送消息到队列时,消息是有序的,这个可以通过发送时设置时间戳或者通过序号进行控制。

消息队列接收消息向下做分发是有序的,消费者接收消息这一块最容易乱序。比如A/B/C三条消息,向下分发:

(1).一个消息队列对多个消费者,此时消费者执行这三条消息的时间是不固定的也就是不受控制的。

(2).一个消息队列对一个消费者,但是消费者进行了多线程消费,理由和上述相似。

解决办法是:一个队列对应一个消费者,严格按照串行执行。

嫌弃串行慢或者只需要保证部分有序的话,可以进行业务分析,通过特定策略,把部分有序的消息发送到同一个队列里,并单线程消费。就类似于打包在一起,最后打包消费。

2.处理消费重复:只能通过业务代码进行筛查。或者通过消息内容计算唯一主键,消费端进行判断。

 

原文地址:https://www.cnblogs.com/chxwkx/p/14629502.html