RabbitMq简单学习记录

链接:https://pan.baidu.com/s/1NQvPDhItv8YEKWZAtfAtjg 提取码:1234

Docker下安装使用Rabbitmq

https://www.cnblogs.com/angelyan/p/11218260.html 这个链接安装下载如下:

一、获取镜像
#指定版本,该版本包含了web控制页面
docker pull rabbitmq:management
回到顶部
二、运行镜像
#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

 

进入/bin/bash后使用:rabbitmqctl help查看指令,里面有很多基本指令供我们使用

Rabbitmq参考文档

当然我们需要做好事前准备,也就是在ip:15672登陆进去后,我们在Admin下创建一个虚拟主机,然后创建一个用户,给用户绑定这个虚拟主机,也就是用户可以访问到(虚拟主机前面要加'/')

消息是首先建立连接,然后通过通道进行传输

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>

pom中引入

1.充当端到端消息中间件

一对一,不经过交换机

#这种模型下,生产者不经过交换机直接将消息放入队列,实现端到端的消息传输
//生产者代码
package helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

public class Provider {
   public static void main(String[] args) throws IOException, TimeoutException {
       new Provider().testSendMessage();
  }
   //生产者发送消息
   public void testSendMessage() throws IOException, TimeoutException {
       //创建连接mq的工厂对象
       ConnectionFactory connectionFactory=new ConnectionFactory();
       //设置连接mq主机
       connectionFactory.setHost("47.98.189.228");
       //设置端口号
       connectionFactory.setPort(5672);
       //设置连接到哪个虚拟主机
       connectionFactory.setVirtualHost("/jd");
       //设置访问虚拟主机的用户名和密码
       connectionFactory.setUsername("nxj");
       connectionFactory.setPassword("123");

       //获取连接对象
       Connection connection = connectionFactory.newConnection();
       //创建连接通道
       Channel channel = connection.createChannel();
       //队列声明
       /*
       * 参数1:队列名称
       * 参数2:durable,定义队列是否要持久化
       * 参数3:exclusive,定义是否独占队列
       * 参数4:autoDelete,定义是否在消费完成后自动删除队列
       * 参数5:额外的附加参数
       */
       channel.queueDeclare("hello",false,false,false,null);
       //发布消息
       /*
       * 参数1:交换机名称
       * 参数2:路由主键
       * 参数3:消息(需要byte[])
        */
       channel.basicPublish("","hello",null, String.valueOf("hello rabbitmq"+ new Random().nextInt()).getBytes());

       channel.close();
       connection.close();
  }
}
//消费者
package helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consume {
   public static void main(String[] args) throws IOException, TimeoutException {
       new Consume().testConsumeMessage();
  }
   public void testConsumeMessage() throws IOException, TimeoutException {
       //创建连接mq的工厂对象
       ConnectionFactory connectionFactory=new ConnectionFactory();
       //设置连接mq主机
       connectionFactory.setHost("47.98.189.228");
       //设置端口号
       connectionFactory.setPort(5672);
       //设置连接到哪个虚拟主机
       connectionFactory.setVirtualHost("/jd");
       //设置访问虚拟主机的用户名和密码
       connectionFactory.setUsername("nxj");
       connectionFactory.setPassword("123");

       //获取连接对象
       Connection connection = connectionFactory.newConnection();
       //创建连接通道
       Channel channel = connection.createChannel();


       //队列声明
       /*
       * 参数1:队列名称
       * 参数2:durable,定义队列是否要持久化
       * 参数3:exclusive,定义是否独占队列
       * 参数4:autoDelete,定义是否在消费完成后自动删除队列
       * 参数5:额外的附加参数
       */
       channel.queueDeclare("hello",false,false,false,null);
       //发布消息
       /*
       * 参数1:交换机名称
       * 参数2:开启消息自动确认机制(如果为true,使用完则删除,否则使用完消息仍然在队列中)
       * 参数3:消费时的回调接口
        */
       channel.basicConsume("hello",true,new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               System.out.println("getMsg:-->"+new String(body));
          }
      });

       //这里不能关,否则我们消费完还没输出,线程就被杀掉了
       //channel.close();
       //connection.close();
  }
}
  • queueDeclare中第二个参数,设置为true,代表的队列被持久化,但是队列中的消息并不会

  • 当发送消息时,设置第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN 代表消息被持久化,这样重启消息才会有

  • 生产者和消费者在写queueDeclare时需要完全一致

 

代码优化,将重复代码提取:

package util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMqUtils {
   //创建连接mq的工厂对象
   private   static ConnectionFactory connectionFactory;
   static {
       connectionFactory=new ConnectionFactory();
  }
   //获取连接对象
   public static Connection getConnection(){
       try {

           //ConnectionFactory connectionFactory=new ConnectionFactory();
           //设置连接mq主机
           connectionFactory.setHost("47.98.189.228");
           //设置端口号
           connectionFactory.setPort(5672);
           //设置连接到哪个虚拟主机
           connectionFactory.setVirtualHost("/jd");
           //设置访问虚拟主机的用户名和密码
           connectionFactory.setUsername("nxj");
           connectionFactory.setPassword("123");
           return connectionFactory.newConnection();
      } catch (IOException e) {
           e.printStackTrace();
      } catch (TimeoutException e) {
           e.printStackTrace();
      }
       return null;
  }
   //关闭连接
   public static void close(Channel channel,Connection connection){
       try {
           if(channel!=null)channel.close();
           if(connection!=null) connection.close();
      } catch (Exception e) {
           e.printStackTrace();
      }
  }
}

2.工作队列

多个消费者共享一个队列,不经过交换机

此队列下的消息时平均分配的,也就是如果有3个消费者,第一个消息给消费者1,第二个消息给消费者2,第三个消息给消费者3,第四个呢,给消费者1这样循环,这样如果某一个消费者处理的慢,而其他的消费者都处理完成闲置了,那么就会造成消息堆积。

//生产者,和上面一样的
private void testSendMsg() throws IOException {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   //String queue, boolean durable, boolean exclusive, boolean autoDelete,
   channel.queueDeclare("work",true,false,false,null);
   //String exchange, String routingKey, BasicProperties props, byte[] body
   for (int i = 1; i <= 11; i++) {
       channel.basicPublish("","work", MessageProperties.PERSISTENT_TEXT_PLAIN,("消息"+i).getBytes());
  }
   RabbitMqUtils.close(channel,connection);
}
//消费者1
package workqueue;

import com.rabbitmq.client.*;
import util.RabbitMqUtils;

import java.io.IOException;

public class Consumer1 {
   public static void main(String[] args) throws IOException {
       new Consumer1().testConsumeMsg();
  }
   private void testConsumeMsg() throws IOException {
       Connection connection = RabbitMqUtils.getConnection();
       Channel channel = connection.createChannel();
       channel.queueDeclare("work",true,false,false,null);
       channel.basicConsume("work",true,new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               System.out.println("1:-->"+new String(body));
          }
      });
  }
}

消费者2和消费者1是一样的,我们运行消费者1和消费者2,然后启动生产者往其中丢入消息,结果如下:

#消费者2
2:-->消息1
2:-->消息3
2:-->消息5
2:-->消息7
2:-->消息9

#消费者1
1:-->消息0
1:-->消息2
1:-->消息4
1:-->消息6
1:-->消息8

过程大致是,消息队列将消息平均分配给各个消费者,消息消费者需要逐个处理,但是对于整个消息如果消费者开启了自动确认,那么这些消息就会交给这个消费者处理了,并且消息队列讲这些消息清除掉,如果此时消费者当机了,那么消息就丢失了,而且如果某个消费者处理的很慢,其他的消费者已经闲置了,显然这样的方式也不好,为了实现能者多劳,处理一个确认一个,我们做出如下调整:

  1. 将channel.basicConsume 方法中第二个参数即自动确认改为false

  2. 增加channel.basicQos(1);//每次处理一条消息

  3. 在方法最后加上channel.basicAck(envelope.getDeliveryTag(), false);//手动确认,否则不继续接收了 (手动处理是必要的,否则消息队列不删除这条消息,会造成消息被重复消费)

调整后消费者如下:

//消费者1
private void testConsumeMsg() throws IOException {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   channel.basicQos(1);
   channel.queueDeclare("work",true,false,false,null);
   channel.basicConsume("work",false,new DefaultConsumer(channel){
       @Override
       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           System.out.println("1:-->"+new String(body));
           channel.basicAck(envelope.getDeliveryTag(), false);//手动确认,否则不继续接收了
      }
  });
}

//消费者2
private void testConsumeMsg() throws IOException {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   channel.basicQos(1);//每次处理一条消息
   channel.queueDeclare("work",true,false,false,null);
   channel.basicConsume("work",false,new DefaultConsumer(channel){
       @Override
       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           System.out.println("2:-->"+new String(body));
           try {
               Thread.sleep(1000);
          } catch (InterruptedException e) {
               e.printStackTrace();
          }
           //参数一:确认的是哪个消息
           //参数二:是否开启多个消息同时确认
           channel.basicAck(envelope.getDeliveryTag(), false);//手动确认,否则不继续接收了
      }
  });
}

3.发布/订阅模型

fanout广播,不同消费者拿到相同信息,经过交换机

  • 我们只需要将信息发送给交换机即可

  • 交换机负责发送到各个队列

  • 每个消费者绑定自己的队列

这样消费者都能拿到消息,实现一条消息被多个消费者消费

//生产者将消息发给交换机,由交换机进行中转
try {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   channel.basicQos(1);//通道每次只接受一条消息
   //通道声明指定的交换机
   //String exchange, String type(这里固定位fanout,即广播类型)
   channel.exchangeDeclare("logs","fanout");
   //String exchange, String routingKey, BasicProperties props, byte[] body
   for (int i = 0; i < 10; i++) {
       channel.basicPublish("logs","",null,("第"+i+"个消息").getBytes());
  }
   RabbitMqUtils.close(channel,connection);
} catch (IOException e) {
   e.printStackTrace();
}
//消费者使用临时队列和交换机绑定起来,具体如上图所示
try {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   //创建一个临时队列(因为没消息这个队列我就不想要了)
   String queue = channel.queueDeclare().getQueue();
   //将队列绑定到交换机
   //String queue, String exchange, String routingKey
   channel.queueBind(queue,"logs","");
   channel.basicConsume(queue,true,new DefaultConsumer(channel){
       @Override
       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           System.out.println("消费者1-->"+new String(body));
           //channel.basicAck(envelope.getDeliveryTag(), false);//手动确认,否则不继续接收了
      }
  });

} catch (Exception e) {
   e.printStackTrace();
}

//其他n个消费者和上方代码相同

 

4.路由

direct消费者只能拿到rountKey标识相同的消息

fanout模式下,一条消息可以被所有消费者消费,在某些情况下我们并不希望我的消息被所有消费者消费,而只是那些满足我要求的消费者消费,这时候我们进入direct模式。

在Direct模式下:

  • 队列与交换机的绑定需要指定一个RountingKey(路由Key)

  • 消息的发送方在向交换机发送消息时,也必须指定消息的RountKey

  • 交换机不在把消息给每一个和他绑定的队列了,而是根据RountKey进行判断,只有队列的路由key和消息的路由key完全一致,才会接受到消息

//生产者生产消息
try {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   //String exchange, String type)
   channel.exchangeDeclare("dEx","direct"); //必须指定为direct,rountkey才会生效
   //String exchange, String routingKey, AMQP.BasicProperties props, byte[] body
   for (int i = 0; i < 3; i++) {
       //生产ms标识的消息
       channel.basicPublish("dEx","ms",null,("ms消息"+i).getBytes());
       //生产mail标识的消息
       channel.basicPublish("dEx","mail",null,("mail消息"+i).getBytes());
       //生产no标识的消息(没队列标识绑定,将不传递给任何队列)
       channel.basicPublish("dEx","no",null,("no消息"+i).getBytes());
  }
   RabbitMqUtils.close(channel,connection);

} catch (IOException e) {
   e.printStackTrace();
}
//ms消费者
try {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   String queue = channel.queueDeclare().getQueue();//声明临时队列
   channel.exchangeDeclare("dEx","direct");
   //String queue, String exchange, String routingKey
   channel.queueBind(queue,"dEx","ms");//绑定交换机,标识为ms
   channel.basicConsume(queue,true,new DefaultConsumer(channel){
       @Override
       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           System.out.println("ms消费者接受:"+new String(body));
      }
  });
} catch (IOException e) {
   e.printStackTrace();
}

//mail消费者
try {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   String queue = channel.queueDeclare().getQueue();
   channel.exchangeDeclare("dEx","direct");
   //String queue, String exchange, String routingKey
   //同时绑定两个
   channel.queueBind(queue,"dEx","mail");
   channel.queueBind(queue,"dEx","ms");
   channel.basicConsume(queue,true,new DefaultConsumer(channel){
       @Override
       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           System.out.println("mail消费者接受:"+new String(body));
      }
  });
} catch (IOException e) {
   e.printStackTrace();
}

结果:

mail消费者接受:ms消息0
mail消费者接受:mail消息0
mail消费者接受:ms消息1
mail消费者接受:mail消息1
mail消费者接受:ms消息2
mail消费者接受:mail消息2

ms消费者接受:ms消息0
ms消费者接受:ms消息1
ms消费者接受:ms消息2

 

5.Topics动态路由

topic动态绑定多个,解决一个个绑定造成代码冗余复杂,可以写通配符
主要针对于我们rountKey为了明确定义,写的精确,如item.door.field1

*:匹配一个词(词,并不是单词)

#:匹配0~多个词

如:

a.#:可以匹配:a.bc ; a.bc.ca 等

a.*:只能匹配形如a.bvc

//生产者还是一样
try {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   //(String exchange, String type
   channel.exchangeDeclare("topEx","topic");
   //String exchange, String routingKey, BasicProperties props, byte[] body
   channel.basicPublish("topEx","item.a1.b1",null,("-->item.a1.b1").getBytes());
   channel.basicPublish("topEx","item.a1.b1.c1",null,("-->item.a1.b1.c1").getBytes());
   channel.basicPublish("topEx","item.a1.b1.c1.d1",null,("-->item.a1.b1.c1.d1").getBytes());
   RabbitMqUtils.close(channel,connection);
} catch (IOException e) {
   e.printStackTrace();
}
//消费者通配符匹配
//消费者1
try {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   String exchange="topEx";
   channel.exchangeDeclare(exchange,"topic");
   //声明一个临时队列
   String queue = channel.queueDeclare().getQueue();
   //String queue, String exchange, String routingKey
   channel.queueBind(queue,exchange,"item.*.b1");//能匹配a1.b1
   channel.basicConsume(queue,true,new DefaultConsumer(channel){
       @Override
       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           System.out.println("item.*.b1--》收到消息:"+new String(body));
      }
  });
} catch (IOException e) {
   e.printStackTrace();
}

//消费者2
try {
   Connection connection = RabbitMqUtils.getConnection();
   Channel channel = connection.createChannel();
   String exchange="topEx";
   channel.exchangeDeclare(exchange,"topic");
   //声明一个临时队列
   String queue = channel.queueDeclare().getQueue();
   //String queue, String exchange, String routingKey
   channel.queueBind(queue,exchange,"#.c1");//能匹配"item.a1.b1.c1" 和
   channel.basicConsume(queue,true,new DefaultConsumer(channel){
       @Override
       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           System.out.println("#.c1--》收到消息:"+new String(body));
      }
  });
} catch (IOException e) {
   e.printStackTrace();
}

结果:

#item.*.b1--》收到消息:第1-->item.a1.b1

#.c1--》收到消息:第1-->item.a1.b1.c1

 

6.SpringBoot整合(强力推荐尚硅谷老师的springboothttps://www.bilibili.com/video/BV1Et411Y7tQ?p=90 这里面整合rabbitmq讲的更好!!!)

#配置
spring:
rabbitmq:
  host: 47.98.189.228
  port: 5672
  virtual-host: /jd
  username: nxj
  password: 123
6.1helloworld
//消费者(注意这里必须先写消费者(即使现在不运行),否则只有生产者,队列将不会创建出来的)
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))//代表消费者的监听需要消费
public class Consume {

   @RabbitHandler//代表接收消息后的回调
   public void receiveMsg(String msg){
       System.out.println(msg);
  }
}
//生产者
@Test
void helloWorld(){
   rabbitTemplate.convertAndSend("hello","hello JD!");
}

启动生产者,发现消费者也接受到了消息(消费者不需要显示启动)

 

6.2workqueue

和上方一样,只不过绑定多个消费者

//消费者1
@Component
@RabbitListener(queuesToDeclare = @Queue("work"))
public class Consume01 {
   @RabbitHandler
   public void receiveMsg(String msg){
       System.out.println("consume1++:"+msg);
  }
}

//消费者2
@Component
@RabbitListener(queuesToDeclare = @Queue("work"))
public class Consume02 {
   @RabbitHandler
   public void receiveMsg(String msg){
       System.out.println("consume2--:"+msg);
  }
}
//生产者
@Test
void workQueue(){
   for(int i=0;i<10;i++){
       rabbitTemplate.convertAndSend("work","hello JD!"+i);
  }
}
6.3fanout
//消费者
package com.nxj.fanout;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class ConsumeFanout {

   @RabbitListener(bindings = {
       @QueueBinding(value = @Queue,//创建临时队列
                     exchange = @Exchange(name = "logs",type = "fanout"))
  })
   @RabbitHandler
   public void receiveMsg1(String msg){
       System.out.println("consume1-->"+msg);
  }

   @RabbitListener(bindings = {
       @QueueBinding(value = @Queue,
                     exchange = @Exchange(name = "logs",type = "fanout"))
  })
   @RabbitHandler
   public void receiveMsg2(String msg){
       System.out.println("consume2-->"+msg);
  }

}
//生产者
@Test
void fanout(){
   rabbitTemplate.convertAndSend("dEx","","fanout-->hello JD!");
}
6.4rount(direct)
//消费者
package com.nxj.direct;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class ConsumeDirect {
   @RabbitListener(bindings = {
       @QueueBinding(value = @Queue,
                     exchange = @Exchange(name = "dEx",type = "direct"),
                     key = {"info"}
                    )
  })
   @RabbitHandler
   public void c1msg(String msg){
       System.out.println("@1#"+msg);
  }

   @RabbitListener(bindings = {
       @QueueBinding(value = @Queue,
                     exchange = @Exchange(name = "dEx",type = "direct"),
                     key = {"error"}
                    )
  })
   @RabbitHandler
   public void c2msg(String msg){
       System.out.println("@2#"+msg);
  }
}
//生产者 
@Test
void direct(){
   rabbitTemplate.convertAndSend("logs","info","fanout-->hello JD!");
   rabbitTemplate.convertAndSend("logs","error","fanout-->!@EWR#$!");
}
6.5topic
//消费者
package com.nxj.topic;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class ConsumeTopic {
   @RabbitListener(bindings = {
       @QueueBinding(value = @Queue,
                     exchange = @Exchange(name = "topicEx",type = "topic"),
                     key = "*.*")
  })
   @RabbitHandler
   public void c1msg(String msg){
       System.out.println(msg);
  }

   @RabbitListener(bindings = {
       @QueueBinding(value = @Queue,
                     exchange = @Exchange(name = "topicEx",type = "topic"),
                     key = "user.#")
  })
   @RabbitHandler
   public void c2msg(String msg){
       System.out.println(msg);
  }
}
//生产者
@Test
void topic(){
   rabbitTemplate.convertAndSend("topicEx","user.a1","topic*.*");
   rabbitTemplate.convertAndSend("topicEx","user.a2.gg","topic*.*.*!!");
}

 

 

原文地址:https://www.cnblogs.com/ningxinjie/p/13916398.html