rabbitmq 使用心得

公司业务需要,要和其他公司进行协同开发,需要使用mq进行通信,所以才会有此文章,总结踩过的坑。

什么是rabbitmq

既然要使用rabbitmq,我们首先要知道什么是rabbitmq。rabbitmq是一个消息代理,核心就是接受和发送消息。

为什么选择rabbiymq

目前市面上可选择的mq有很多,ActiveMQ、ZeroMQ、Appche Qpid,为什么rabbitmq可以等到众多人的青睐呢?

  • 1.除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器
  • 2.可靠性,RabbitMQ的持久化支持,保证了消息的稳定性
  • 3.高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性
  • 4.集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单
  • 5.社区活跃度高,根据网上资料来看,RabbitMQ也是首选

rabbitmq的工作机制

  • 生产者(Producer):消息的创建者,负责创建和推送数据到消息服务器
  • 消费者(Consumer):消息的接收方,用于处理数据和确认消息
  • 代理:RabbitMQ本身,用于扮演“邮局”的角色,本身不生产消息,只是扮演“邮局”的角色
消息发送原理
  • 应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,和连接数据库相似,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。

  • 信道是创建在TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的

  • ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器

  • Channel(信道):消息推送使用的连接通道

  • Exchange(交换器):用于接受、分配消息

  • Queue(队列):用于储存生产者的消息

  • RoutingKey(路由键):用于把生成者的数据分配到交换器上

  • BindingKey(绑定键):用于把交换器的消息绑定到队列上

消息的长度限制

  • 上面已经基本介绍了rabbitmq,下面就让我说说踩过的坑,我们根据业务需要将推送时间指定为整点推送每小时进行一次,想象总是美好的,当我十分充满自信的开始进行推送测试的时候,开始是没有什么问题的,但是随着数据量逐渐增加,问题就出现了。
  • 接收到的消息和发送的消息是对不上的,发挥出自己百度程序员的资深实例,终于发现是因为mq的消息长度是有限制的,当你超过他的长度时,你只会接收到长度允许范围内的字节数,超过长度的数据都会丢失
  • 所以在我们使用mq推送消息时一定要提前计划好消息的长度,避免超出长度限制丢失数据,及时处理

消息数量

  • 既然长度是有限制的,数量当然也是有的
  • 在我发现长度超出最大限制时,我改变了策略,逐条推送,这样肯定是不会超出长度限制的,当我调整好之后,和我想象的一样,果然没有问题,就这样运行了一天之后,我发现消息总会堆在mq无法消费,但是在测试的时候又没有问题,于是我用了大批量数据测试,果然复现了这个问题,消息条数也是可以设置的
  • 当我将消息数量调整之后就不再有这个问题了

消息应答ack机制

执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。基于现在的代码,一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息。

但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ可以删除它了。

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会不丢失任何消息了。

没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

消息应答是默认打开的。我们明确地把它们关掉了(autoAck=true)。现在将应答打开,一旦我们完成任务,消费者会自动发送消息应答。

channel.basicConsume(QUEUE_NAME, autoAck, consumer);

自动应答: 不在乎消费者对消息处理是否成功,都会告诉队列删除消息。如果处理消息失败,实现自动补偿(队列投递过去 重新处理)。

public class Consumer {
        //队列名称
        private static final String QUEUE_NAME = "test_queue";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("消费者启动..........");
            //创建新的连接
            Connection connection = MQConnectionUtils.newConnection();
           //创建Channel
            Channel channel = connection.createChannel();
            // 消费者关联队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
             DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
                  //监听获取消息
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg =new String(body,"UTF-8");
                        System.out.println("消费者获取生产者消息:"+msg);
                    }
              };
            //牵手模式设置  默认自动应答模式  true:自动应答模式  
              channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);//    fanse手动应答          
              
            //关闭通道和连接
             channel.close();
             connection.close();
        }
}

手动应答: 消费者处理完业务逻辑,手动返回ack(通知)告诉队列处理完了,队列进而删除消息。

public class Consumer {
       //队列名称
        private static final String QUEUE_NAME = "test_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("消费者启动..........");
            //创建新的连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建Channel
            final Channel channel = connection.createChannel();
            // 消费者关联队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
              DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
                  //监听获取消息
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg =new String(body,"UTF-8");
                        System.out.println("消费者获取生产者消息:"+msg);
                        channel.basicAck(envelope.getDeliveryTag(), false);  //手动应答 告诉消息队列服务器 消费成功
                    }
              };
              //模式设置  默认自动应答模式  true:自动应答模式  
              channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);//    fanse手动应答          
              
             //关闭通道和连接
             channel.close();
             connection.close();
        }
}
原文地址:https://www.cnblogs.com/xiaoxiaoliu/p/10820699.html