rabbitmq 笔记

 

 先记录一下rabbitmq的基础后面再写用docker安装rabbitmq

相关概念

     生产者

              Producer:投递消息的一方,它创建消息并发布到rabbitmq中。 消息包含2部分:消息体(payload)和标签(label),消息体是带有业务逻辑结构的数据比如json字符串。消息标签用来表述这条消息, 比如可以是一个交换器的名称和一个路由键,rabbitmq会根据标签吧消息发给感兴趣的消费者。

     消费者

              Consumer:接受消息的一方。 只用到消息的消息体。因为存入队列的消息只有消息体。

    Broker:消息中间件的服务节点

              可以看作是一个rabbitmq服务节点或者rabbitmq服务实例,多数情况下,可看作一台rabbitmq服务器, 生产者将消息发布到broker中, 消费者从broker中订阅消息

      

     RoutingKey: 路由键

    生产者将消息发送到交换器的时候会指定一个RoutingKey,用来指定路由规则,且Routingkey要与交换器类型和BindingKey(绑定键) 联合使用才能生效

     Binding:绑定

     通过绑定将交换器和队列关联起来,绑定的时候一般会指定一个绑定键,这样rabbitmq就知道如何正确将消息路由到队列了。在绑定多个队列到同一个交换器的时候,允许使用相同的绑定键。绑定键的生效依赖于交换器类型,比如fanout类型的交换器会忽视绑定键,将消息路由到所有绑定该交换器的队列中

     队列

              Queue :rabbitmq的内部对象用于存储消息。Rabbimq中的消息只能存在队列中多个消费者可订阅同一个队列,队列将消息平均分摊(Round-Robin)给多个消费者进行处理。 而不是每个消费者收到所有消息并处理

     Exchange 交换器

              生产者将消息发送到Exchange,Exchange将消息路由到一个或者多个队列中,路由不到可能会返回给生产者或直接丢弃。常用交换器有四种类型各自有不同的路由策略

          类型: fanout,direct,topic,header ,AMQP中还提供了System和自定义这两种类型

l  Fanout: 会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

l  Direct:会把消息路由到哪些BindingKey和  RoutingKey完全匹配的队列中

l  Topic:与direct类型交换器类似 , 不同之处在于在匹配BindingKey和  RoutingKey的规则上做了扩展

               匹配规则:

                        RoutingKey为一个“.” 号分割的字符串,比如:” com.rabbitmq.client “, ”java.util.current“

                        BindingKey 和RoutingKey也是点号“.” 分割的字符串

                        BindingKey中可以存在两种特殊字符”*” 和“#” 用以进行模糊匹配。 其中“*” 用于匹配一个单词,“#”用于匹配多个或者零个单词, 比如:“*.*.client”,”com.#” 等

l  Headers:根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时定制一组键值对,,当消息发送到交换器的时候Rabbitmq会取到该消息的headers(也是键值对类型) 然后比对其中的键值对是否完全匹配队列和交换器绑定时定制的键值对。完全匹配则路由到该队列,否则不路由。 Headers的性能差不实用,基本不会看到它的存在

Rabbitmq运转流程

Rabbitmq处理的每一条amqp的指令都是通过Channel完成的

Rabbitmq采用类似NIO(Non-Blocking I/O)的做法,选择tcp连接复用,不仅可以减少性能开销,同时便于管理。

     生产者发送消息

  1. 生产者连接到rabbitmq broker,建立一个Connection,开启一个Channel
  2. 生产者申明一个交换器并设置属性,比如交换器类型,是否持久化等
  3. 生产者申明一个队列并设置属性, 比如是否持久化,是否排他等
  4. 生产者通过路由键将交换器和队列绑定起来
  5. 生产者发送消息到Rabbitmq Broker,其中包含路由键,交换器等信息
  6. 相应的交换器根据接收到的路由键查找匹配的队列
  7. 如果找到,则将生产者发送过来的消息存入队列中
  8. 没有找到,则根据生产者设置的属性将消息选择丢弃还是退回给生产者
  9. 关闭channel
  10. 关闭Connection

消费者接受消息

  1. 消费者连接到rabbitmq broker,建立一个Connection,开启一个Channel
  2. 消费者确认(ack) 收到消息
  3. Rabbitmq队列中删除对应的已经被确认被收到的消息
  4. 关闭channel
  5. 关闭Connection

连接rabbitmq

     给定参数连接

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;
 

ConnectionFactory factory = new ConnectionFactory();

 factory.setUsername("admin");

 factory.setPassword("admin");

// 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通

 factory.setVirtualHost("/tets_virtual_host");

 factory.setHost("127.0.0.1");

 factory.setPort(5672);

 Connection conn = factory.newConnection();

     通过uri方式连接

ConnectionFactory factory = new ConnectionFactory();

factory.setUri("amqp://username:password@ipAdress:portNum/virtualhost");

Connection conn = factory.newConnection();

Connection可以创建多个channel实例,但是channel 实例不能在线程间共享,channel的共享是非线程安全的。应用程序 应该为每一个线程开辟一个channel

交换器和队列

String exchangeName = "exchangeName";

String routingKey = "com.test";

channel.exchangeDeclare(exchangeName, "direct", true);

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, exchangeName, routingKey);

以上代码创建了一个持久化的,非自动删除的,绑定类型为direct的交换器,同时也创建了一个非持久化的,非自动删除的,排他的队列,队列名称由rabbitmq自动生成。

也展示了如何使用路由键将队列和交换器绑定

Channel的api方法都是可以重载的

exchangeDeclare方法详解

exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成

DeclareOk exchangeDeclare(String exchange,String type, boolean durable,boolean aotudelete,boole internal,Map<String,Object> arguments)

返回值DeclareOk 用来标识成功声明了一个交换器

参数说明:

  •   Exchange: 交换器名称
  •   Type: 交换器类型,如:fanout,topic
  •   Durable: 设置是否持久化,持久化可将交换器存盘,服务器重启不会丢失相关信息
  •   Internal:是否内置,内置的交换器客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到这个交换器这种方式
  •   Autodelete:是否自动删除,自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后解绑
  •   Arguments:其他一些结构化参数,比如alternate-exchange

删除交换器的方法:

DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;



void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;



DeleteOk exchangeDelete(String exchange) throws IOException;
 

Exchange是交换器的名称

IfUnused 是否在交换器没有使用的情况下删除,true为是,fale则无论如何都会删除

QueueDeclare方法详解

       QueueDeclare只有两个重载方法:

com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException;
com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autodelete, Map<String, Object> arguments) throws IOException;

不带参数的方法默认创建一个由rabbitmq命名的(称之为匿名队列),排他的,自动删除的非持久化的队列

参数说明:

  • Queue: 队列名称
  • Durable:是否持久化,true会存盘消息,重启服务器相关消息不会丢失
  • Exclusive:是否排他,如果一个队列声明成排他队列,该队列仅对首次声明他的连接可见,并在连接断开时自动删除,需要注意三点:排他队列基于连接(connection) 可见,同一个连接的不同通道(channel)可以同时访问同一个连接创建的排他队列,“首次”是指如果一个连接已经申明了一个排他队列,其他连接是不允许建立同名的排他队列的,这和普通队列不同。即使该队列是持久化的,一旦连接关闭了或者客户端推出该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景
  • Autodelete: 是否自动删除,自动删除的前提是至少有一个消费者连接这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
  • Arguments :设置队列的其他一些参数: 比如x-message-ttl

queueDeclarePassive 方法也是一个比较常用的声明队列的方法,用来检测相应的队列是否存在, 存在则正常返回,不存在抛出404 channel exception异常同时channel也会关闭。

com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String var1) throws IOException;

删除队列的方法:

com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;

com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue, boolean isUnused, boolean isEmpty) throws IOException;

void queueDeleteNoWait(String queue, boolean isUnused, boolean isEmpty) throws IOException;

PurgeOk queuePurge(String var1) throws IOException;

queue是队列名称

ifUnused参考交换器的删除方法的参数

isEmpty :  true表示在队列为空(队列没有任何消息堆积)的情况下才能删除

queuePurge 方法区别于queueDelete , 它用来清空队列中的内容而不删除队列本身

queueBind 方法详解

该方法将队列和交换器绑定, 以及将已经绑定的队列和交换器进行解绑

以下是绑定的api

com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String var1, String var2, String var3) throws IOException;



com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String var1, String var2, String var3, Map<String, Object> var4) throws IOException;



void queueBindNoWait(String var1, String var2, String var3, Map<String, Object> var4) throws IOException;

参数说明:

  • Queue: 队列名称
  • Exchange: 交换器
  • RoutingKey: 用来绑定队列和交换器的路由键
  • Argument: 定义绑定的一些参数

以下是解绑的api

com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String var1, String var2, String var3) throws IOException;



com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String var1, String var2, String var3, Map<String, Object> var4) throws IOException;

exchangeBind 方法详解

该方法将交换器与交换器绑定以及解绑,此法与将交换器和队列绑定如出一辙, api如下

BindOk exchangeBind(String queue, String exchange, String routingKey) throws IOException;



BindOk exchangeBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;



void exchangeBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;



UnbindOk exchangeUnbind(String queue, String exchange, String routingKey) throws IOException;



UnbindOk exchangeUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;



void exchangeUnbindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

方法中的参数参考exchangeDeclare方法

交换器绑定交换器示例:

channel.exchangeDeclare("source", "direct", false, true, null);

channel.exchangeDeclare("destination", "fanout", false, true, null);

channel.exchangeBind("destination", "source", "exKey");

channel.queueBind("queue", "destination", "");

channel.basicPublish("source","exKey",null,"exToExDemo".getBytes());

生产者发送消息给交换器source,source根据路由键找到与其匹配的另一个交换器destination并把消息转发到destination中进而存储在destination中绑定的队列中

RabbitMQ的消息存储在队列中所以交换器的使用并不真正消耗服务器的性能,而队列会

发送消息

       使用channel的basicChannel方法发送消息,例如发送一条hello world消息:

channel.basicPublish(exchangeName,routingKey,null,"hello world".getBytes());

为了更好的控制发送,可以使用mandatory这个参数,或者可以发送一些特顶顶属性的信息:

channel.basicPublish(exchangeName,routingKey, new AMQP.BasicProperties.Builder()

        .contentType("text/plain") //设置contentType

        .deliveryMode(2) //投递模式设置为2,消息会被持久化(存盘)到服务器中

        .priority(2)//优先级为0

        .userId("hidden")

        .build(),"hello world ".getBytes());

发送带header的消息

Map<String, Object> header = new HashMap<>();

header.put("test", "testText");

header.put("testAgain", "agian");

channel.basicPublish(exchangeName,routingKey, new AMQP.BasicProperties.Builder()

        .headers(header)

        .build(),"hello world".getBytes());

发送过期时间的消息

channel.basicPublish(exchangeName,routingKey, new AMQP.BasicProperties.Builder()

        .expiration("60000")

        .build(),"hello world ".getBytes());

等等。。。。

BasicPublish的api:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;



void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;



void basicPublish(String var1, String var2, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

参数说明:

  • Exchange:交换器名称,指明消息发送到哪一个交换器中,若设置为空字符串则被发送到rabbitmq的默认交换器中
  • RoutingKey:路由键,交换器根据路由键将消息村粗到对应的队列中
  • Props:消息的基本属性,其中包括14个属性成员
  • Body: 消息体(payload) 真正需要发送的消息
  • Mandatory  和 immediate 在后面写
原文地址:https://www.cnblogs.com/jxlsblog/p/10085298.html