rabbimq工作模型

RabbitMQ

上图是rabbitmq的图形管理界面:

rabbitmq的基本组件:
  • ConCnections:客户端连接rabbitmq服务器都需要和服务器建立连接(connections)
  • Channels:通道,客户端与服务器发送接收消息都需要通过通道传输。建立连接后就可以创建通道,通道可以绑定交换机或队列来发送生产者消息,可以绑定交换机和队列来消费消息。
  • Eqxchanges:交换机,相比于只用队列来交换信息,交换机可以实现更多种消息消费模式。
  • Queues:消息队列,消息放在队列中,等消费者来消费。
虚拟主机

为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。(就是在建立连接的时候还要添加一个VirtualHost的参数,不同的程序使用不同的虚拟主机就可以相互之间的交换机,队列都互不影响)

可以通过下图步骤添加虚拟主机:

建立连接
public class RabbitMqUtils {
    private static ConnectionFactory connectionFactory;
    static {
        //新建一个连接工程
        connectionFactory=new ConnectionFactory();
        //设置ip
        connectionFactory.setHost("172.18.1.53");
        //设置端口
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置用户名
        connectionFactory.setUsername("ems");
        //设置密码
        connectionFactory.setPassword("123");
    }
    //定义提供连接对象的方法
    public static Connection getConnection(){
        try {
            //通过newConnection()方法就可以创建一个连接
            return connectionFactory.newConnection();
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }
    //关闭连接方法
    public static void closeConnectionAndChanel(Channel channel,Connection conn){
        try {
            if(channel!=null){
                channel.close();
            }
            if (conn!=null){
                conn.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
RabitMQ的消息模型

1,hello模型(直连)

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序。

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

    provider.java

package com.example.demo.helloworld;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author zdl
 * @create 2020/8/6 11:41
 */
public class Provider {

    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();
        //获取链接通道
        Channel channel = connection.createChannel();
        //通道绑定对应的消息队列
        //参数1,队列名称,不存在队列将自动创建队列
        //参数2,用来定义队列是否启动持久化
        //参数3,exclusive 是否独占队列,只能被当前通道绑定
        //参数4,autoDelete,是否在消费完成后并且消费者断开连接将自动删除队列,被消费者消费完,队列没有其他元素就删除队列
        //参数5,额外参数
        channel.queueDeclare("hello",false,false,false,null);
        //发布消息
        //参数1:交换机名称(连队列就为"") 参数2:队列名称(如果连交换级就为"") 参数3:传递消息额外设置 参数4:消息的具体内容
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

consumer.java

package com.example.demo.helloworld;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException
/**
 * @author zdl
 * @create 2020/8/6 13:40
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException{
        Connection connection = RabbitMqUtils.getConnection();
        //获取链接通道
        Channel channel = connection.createChannel();
        //通道绑定对应的消息队列
        //参数1,队列名称,不存在队列将自动创建队列
        //参数2,用来定义队列是否启动持久化(仅队列,不包含消息)
        //参数3,exclusive 是否独占队列
        //参数4,autoDelete,是否在消费完成后自动删除队列
        //参数5,额外参数
        channel.queueDeclare("hello",false,false,false,null);
        //消费消息
        //参数1:消费队列名称
        //参数2:开启消息自动确认机制
        //参数3:消费时的回调接口(当有消息可以消费时就会调用该消费者的handleDelivery方法来进行消费)
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override  //最后一个参数;消息队列中取出的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body)+"=====================");
            }
        });
    }

}

2,Work queues(任务模型)

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

角色:

  • P:生产者:任务的发布者
  • C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
  • C2:消费者-2:领取任务并完成任务,假设完成速度快

生产者

package com.example.demo.workquene;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 14:52
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMqUtils.getConnection();
        Channel channel=connection.createChannel();

        channel.queueDeclare("work",true,false,false,null);
        for (int i=0;i<20;i++){
            channel.basicPublish("","work",null,(i+"hollo work quene").getBytes());
        }
        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

消费者1

package com.example.demo.workquene;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 15:07
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel=connection.createChannel();
        //每次只消费一条信息
        channel.basicQos(1);
        channel.queueDeclare("work",true,false,false,null);
        //参数2:是否自动确认,不自动确认的话就处理完再确认然后才能再消费
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println("消费者-1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消费者2

package com.example.demo.workquene;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author zdl
 * @create 2020/8/6 15:31
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException{
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel=connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
                    IOException {
                System.out.println("消费者-2:"+new String(body));
                //参数1:确认队列中具体那个消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

在面两个消费者中,更改了1:每次只消费一条消费,2:取消消息自动确认。这么做是为了实现按劳分配。

如果是自动确认的话,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。但是我们的消费者1的消费能力是比较差的,消费者2都消费完10条了,消费者1一条还没消费完,所以这是不符合我们的期望的,我们希望消费能力强的可以多消费点信息。

所以我们需要关闭消息自动确认,并在消费者消费完后调用channel.basicAck(envelope.getDeliveryTag(),false);来手动确认消息,这样才会在消费完一条消息后才会进行下一条消息的消费。

fanout(广播)

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

生产者

package com.example.demo.fanout;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 16:12
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定交换机 参数1:交换机名称 参数2:交换机类型 fanout:广播,一条消息多个消费者同时消费
        channel.exchangeDeclare("logs","fanout");
        //发布消息
        channel.basicPublish("logs","",null,"hello".getBytes());
        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

消费者1

package com.example.demo.fanout;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author zdl
 * @create 2020/8/6 16:22
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel=connection.createChannel();
        channel.exchangeDeclare("logs","fanout");
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定交换级和队列
        channel.queueBind(queue,"logs","");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1"+new String(body));
            }
        });

    }
}

消费者2和消费者3与消费者1类似

结果:

Direct(订阅模型)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

角色

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

生产者:

package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;

/**
 * @author zdl
 * @create 2020/8/6 17:13
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");
        String routingKey="error";
		//通过参数2指定routingKey
        channel.basicPublish("logs_direct",routingKey,null,("这是directm模型发布的基于route key:["+routingKey+"] 发送的消息").getBytes());

        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

消费者1:

package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 17:22
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName="logs_direct";
        channel.exchangeDeclare(exchangeName,"direct");
        //获取临时队列
        String queue = channel.queueDeclare().getQueue();
        //该队列接收routingKey为"info","error","warning"其中任何一个的消息
        channel.queueBind(queue,"logs_direct","info");
        channel.queueBind(queue,"logs_direct","error");
        channel.queueBind(queue,"logs_direct","warning");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
}

消费者2:

  
package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 17:28
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");
        String queue = channel.queueDeclare().getQueue();
		//基于route_key帮定队列和交换机,第三个参数为routingKey,则该队列只接收routingKey为"error"的消息
        channel.queueBind(queue,exchangeName,"error");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
            }
        });
    }
}

测试生产者发送Route key为error的消息时

测试生产者发送Route key为info的消息时

Topic 模式 通配符订阅

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

# 统配符
		* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
		# (hash) can substitute for zero or more words.  匹配一个或多个词
# 如:
		audit.#    匹配audit.irs.corporate或者 audit.irs 等
   		audit.*   只能匹配 audit.irs

生产者

package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 18:11
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        //指定交换机及路由模式
        channel.exchangeDeclare("topic","topic");
        //动态路由key
        String routekey = "user.save.fjie";
        //发布消息
        channel.basicPublish("topic",routekey,null,("这是路由中的动态订阅模型,route key: ["+routekey+"]").getBytes());

        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

消费者1:

package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 18:14
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("topic","topic");
	//临时队列
        String queue = channel.queueDeclare().getQueue();
        //使用通配符绑定routingKey
        channel.queueBind(queue,"topic","user.*");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });

    }
}

消费者2:

package com.example.demo.topic;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author zdl
 * @create 2020/8/6 18:21
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
		//绑定交换机
        channel.exchangeDeclare("topic","topic");
		//临时队列
        String queue = channel.queueDeclare().getQueue();
		//使用通配符绑定routingKey
        channel.queueBind(queue,"topic","user.#");
		//消费
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
            }
        });

    }
}

结果:

原文地址:https://www.cnblogs.com/zdl2234/p/13554960.html