rabbitmq 五种消息模型

具体可参考:https://note.youdao.com/ynoteshare1/index.html?id=db637b43f0ab16cf6db9b9b92d562ca8&type=notebook#/7A55B7E7787A49D0B2E2265D437F3C19;这里写的很具体了;

一、基础环境:

1)创建springboot项目, 并导入如下依赖;

        <!--rabbitmq依赖-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
        <!--springboot mq支持-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2)写一个连接工具类;

/**
 * 建立连接的工具类,用来简单测试消息发送接收功能
 * 实际上与springboot使用不需要该类
 */
public class ConnectionUtil {
    /**
     * 建立连接
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        //mq服务器IP
        factory.setHost("192.168.190.141");
        //铜须端口号
        factory.setPort(5672);
        //虚拟主机
        factory.setVirtualHost("xieqi");
        factory.setUsername("xieqi");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}

二、消息模型;

1)、基本消息模型(basic queues)

producer---   |队列|   ---consumer

功能:一个生产者P发送消息到队列Q,一个消费者C接收。实现了基本的消息的生产和消费。一对一。

生产者:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        Connection connection = ConnectionUtil.getConnection();
        //2.建立通道
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare(
                "simple_queue",//队列名称
                false,//设置是否持久化
                false,//设置是否排他(仅申明他的连接可见)
                false,//是否自动删除
                null);//参数设置

        for (int i = 0; i <10 ; i++) {
            String message=" hello rabbit"+i;
            //通过channel发送消息
            channel.basicPublish(
                    "",//exchange 交换机 ""表示使用默认
                    "simple_queue",// routing_key 路由key
                    null,//设置项
                    message.getBytes());//消息
            System.out.println("消息发送成功:"+message);
        }
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

自动确认消费者:

/**
 * 描述:
 *   消费消息,自动确认(ACK)
 * @author bigpeng
 * @create 2019-07-15 13:40
 */
public class ConsumerAutoACK {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

手动确认消费者:

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection();
        //2.创建通道
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare("simple_queue",
                false,false,false,null);
        //4 定义队列的消费者
        DefaultConsumer consumer=new DefaultConsumer(channel){
            //处理消息,当监听到队列中有消息时,会触发该方法
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("获取到队列simple_queue的消息:"
                               +message);
                Random random=new Random();
                if(random.nextInt(10)%2==1) {
                    //手动ACK
                    //成功ACK
                   //同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。  
                   channel.basicAck(envelope.getDeliveryTag(), false);
                    System.out.println("消费成功");
                }else {
                    //失败的ACK
                    channel.basicNack(envelope.getDeliveryTag(),
                            false,
                            true);//是否重回队列
                    System.out.println("消费失败");
                }
            }
        };
        //将consumer关联到通道

        //自动ACK
//        channel.basicConsume(
//                "simple_queue",//队列名
//                true,//是否自动消息确认(ACK)
//                consumer);//Consumer对象

           //手动ACK
                channel.basicConsume(
                "simple_queue",//队列名
                false,//是否自动消息确认(ACK)
                consumer);//Consumer对象

    }
}

2)、工作队列(work queues)

功能:一个生产者,多个消费者。写法与基本消息模型类似,只不过原来是一个消费者,现在是多个消费者。多个消费者处理队列中的数据。

特点:

1)可以有多个消费者;

2)一条消息只能被多个消费者中的一个消费

3)、发布/订阅模式 Publish/Subscribe

功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

与工作队列区别:

1)工作队列只有一个队列,而发布订阅有多个队列

2)工作队列一个消息只能被多个消费者中的一个消费,而发布订阅一个消息会被多个订阅的消费者消费。

3)发布订阅比工作队列多出一个交换机概念,用来绑定消息发送到哪些消费者。 其实之前的两种模式也需要交换机,其使用默认交换,我们通过空字符串(“”)来识别。

4)、路由模式(Routing)

功能:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key。只有当两个key相匹配时,消息才会发送到对应的消费者队列。即在广播的基础上有了路由的功能。 type 指定为direct。

5)、主题订阅模式(topic)

功能:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配; 符号#:匹配一个或者多个词 lazy.# 可以匹配 lazy.irs或者lazy.irs.cor; 符号*:只能匹配一个词 lazy.* 可以匹配 lazy.irs或者lazy.cor

原文地址:https://www.cnblogs.com/xie-qi/p/13349495.html