RabbitMQ
上图是rabbitmq的图形管理界面:
rabbitmq的基本组件:
- ConCnections:客户端连接rabbitmq服务器都需要和服务器建立连接(connections)
- Channels:通道,客户端与服务器发送接收消息都需要通过通道传输。建立连接后就可以创建通道,通道可以绑定交换机或队列来发送生产者消息,可以绑定交换机和队列来消费消息。
- Eqxchanges:交换机,相比于只用队列来交换信息,交换机可以实现更多种消息消费模式。
- Queues:消息队列,消息放在队列中,等消费者来消费。
虚拟主机
为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。(就是在建立连接的时候还要添加一个VirtualHost的参数,不同的程序使用不同的虚拟主机就可以相互之间的交换机,队列都互不影响)
可以通过下图步骤添加虚拟主机:
建立连接
public class RabbitMqUtils {
private static ConnectionFactory connectionFactory;
static {
//新建一个连接工程
connectionFactory=new ConnectionFactory();
//设置ip
connectionFactory.setHost("172.18.1.53");
//设置端口
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置用户名
connectionFactory.setUsername("ems");
//设置密码
connectionFactory.setPassword("123");
}
//定义提供连接对象的方法
public static Connection getConnection(){
try {
//通过newConnection()方法就可以创建一个连接
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
}
return null;
}
//关闭连接方法
public static void closeConnectionAndChanel(Channel channel,Connection conn){
try {
if(channel!=null){
channel.close();
}
if (conn!=null){
conn.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
RabitMQ的消息模型
1,hello模型(直连)
在上图的模型中,有以下概念:
-
P:生产者,也就是要发送消息的程序。
-
C:消费者:消息的接受者,会一直等待消息到来。
-
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
provider.java
package com.example.demo.helloworld;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author zdl
* @create 2020/8/6 11:41
*/
public class Provider {
//生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
//获取链接通道
Channel channel = connection.createChannel();
//通道绑定对应的消息队列
//参数1,队列名称,不存在队列将自动创建队列
//参数2,用来定义队列是否启动持久化
//参数3,exclusive 是否独占队列,只能被当前通道绑定
//参数4,autoDelete,是否在消费完成后并且消费者断开连接将自动删除队列,被消费者消费完,队列没有其他元素就删除队列
//参数5,额外参数
channel.queueDeclare("hello",false,false,false,null);
//发布消息
//参数1:交换机名称(连队列就为"") 参数2:队列名称(如果连交换级就为"") 参数3:传递消息额外设置 参数4:消息的具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
consumer.java
package com.example.demo.helloworld;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException
/**
* @author zdl
* @create 2020/8/6 13:40
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = RabbitMqUtils.getConnection();
//获取链接通道
Channel channel = connection.createChannel();
//通道绑定对应的消息队列
//参数1,队列名称,不存在队列将自动创建队列
//参数2,用来定义队列是否启动持久化(仅队列,不包含消息)
//参数3,exclusive 是否独占队列
//参数4,autoDelete,是否在消费完成后自动删除队列
//参数5,额外参数
channel.queueDeclare("hello",false,false,false,null);
//消费消息
//参数1:消费队列名称
//参数2:开启消息自动确认机制
//参数3:消费时的回调接口(当有消息可以消费时就会调用该消费者的handleDelivery方法来进行消费)
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override //最后一个参数;消息队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body)+"=====================");
}
});
}
}
2,Work queues(任务模型)
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
- P:生产者:任务的发布者
- C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
- C2:消费者-2:领取任务并完成任务,假设完成速度快
生产者
package com.example.demo.workquene;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 14:52
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for (int i=0;i<20;i++){
channel.basicPublish("","work",null,(i+"hollo work quene").getBytes());
}
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
消费者1
package com.example.demo.workquene;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 15:07
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
//每次只消费一条信息
channel.basicQos(1);
channel.queueDeclare("work",true,false,false,null);
//参数2:是否自动确认,不自动确认的话就处理完再确认然后才能再消费
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("消费者-1:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
消费者2
package com.example.demo.workquene;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 15:31
*/
public class Consumer2 {
public static void main(String[] args) throws IOException{
Connection connection = RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
IOException {
System.out.println("消费者-2:"+new String(body));
//参数1:确认队列中具体那个消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
在面两个消费者中,更改了1:每次只消费一条消费,2:取消消息自动确认。这么做是为了实现按劳分配。
如果是自动确认的话,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。但是我们的消费者1的消费能力是比较差的,消费者2都消费完10条了,消费者1一条还没消费完,所以这是不符合我们的期望的,我们希望消费能力强的可以多消费点信息。
所以我们需要关闭消息自动确认,并在消费者消费完后调用channel.basicAck(envelope.getDeliveryTag(),false);来手动确认消息,这样才会在消费完一条消息后才会进行下一条消息的消费。
fanout(广播)
在广播模式下,消息发送流程是这样的:
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
生产者
package com.example.demo.fanout;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 16:12
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//将通道声明指定交换机 参数1:交换机名称 参数2:交换机类型 fanout:广播,一条消息多个消费者同时消费
channel.exchangeDeclare("logs","fanout");
//发布消息
channel.basicPublish("logs","",null,"hello".getBytes());
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
消费者1
package com.example.demo.fanout;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 16:22
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare("logs","fanout");
//临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换级和队列
channel.queueBind(queue,"logs","");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1"+new String(body));
}
});
}
}
消费者2和消费者3与消费者1类似
结果:
Direct(订阅模型)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
角色
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
生产者:
package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 17:13
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
String routingKey="error";
//通过参数2指定routingKey
channel.basicPublish("logs_direct",routingKey,null,("这是directm模型发布的基于route key:["+routingKey+"] 发送的消息").getBytes());
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
消费者1:
package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 17:22
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
String exchangeName="logs_direct";
channel.exchangeDeclare(exchangeName,"direct");
//获取临时队列
String queue = channel.queueDeclare().getQueue();
//该队列接收routingKey为"info","error","warning"其中任何一个的消息
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","error");
channel.queueBind(queue,"logs_direct","warning");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
消费者2:
package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 17:28
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
String queue = channel.queueDeclare().getQueue();
//基于route_key帮定队列和交换机,第三个参数为routingKey,则该队列只接收routingKey为"error"的消息
channel.queueBind(queue,exchangeName,"error");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
测试生产者发送Route key为error的消息时
测试生产者发送Route key为info的消息时
Topic 模式 通配符订阅
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!这种模型Routingkey
一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# 统配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配一个或多个词
# 如:
audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配 audit.irs
生产者
package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 18:11
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//指定交换机及路由模式
channel.exchangeDeclare("topic","topic");
//动态路由key
String routekey = "user.save.fjie";
//发布消息
channel.basicPublish("topic",routekey,null,("这是路由中的动态订阅模型,route key: ["+routekey+"]").getBytes());
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
消费者1:
package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 18:14
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("topic","topic");
//临时队列
String queue = channel.queueDeclare().getQueue();
//使用通配符绑定routingKey
channel.queueBind(queue,"topic","user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
消费者2:
package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 18:21
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("topic","topic");
//临时队列
String queue = channel.queueDeclare().getQueue();
//使用通配符绑定routingKey
channel.queueBind(queue,"topic","user.#");
//消费
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
结果: