RabbitMQ 消息模式

消息模式实例

视频教程:https://ke.qq.com/course/304104

编写代码前,最好先添加好用户并设置virtual hosts

一、简单模式

1.导入jar包

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>4.5.0</version>
    </dependency>

2.创建连接

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String QUEUE = "testhello"; //队列名字

    public static void main(String[] args) throws Exception{
        //获取连接
        Connection connection = ConnectionUtil.getConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //声明队列,如果队列存在则什么都不做,如果队列不存在才创建
        //参数一: 队列的名字
        //参数二: 是否持久化队列,我们的队列模式是在内存中的,如果rabbit重启会丢失,如果我们设置为true 则会保存到erlng自带的数据库中,重启会重新获取
        //参数三: 是否排外,有两个作用,第一个当我们的链接关闭后是否会自动删除队列,作用二,是否私有当前队列,如果私有了,其他通道不可以访问当前队列,如果为true 一般适合一个队列消费者的时候
        //参数四: 是否自动删除
        //参数五 我们的一些其他的参数
        channel.queueDeclare(QUEUE, false, false, false, null);

        //发送内容
        channel.basicPublish("", QUEUE, null, "hello world".getBytes());

        //关闭连接
        channel.close();
        connection.close();
    }
}

3.消费者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Receiver {
    private final static String QUEUE = "testhello"; //队列名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        //接收消息,参数二 是自动确认
        channel.basicConsume(QUEUE, true, consumer);

       while (true) {
            //获取消息   如果没有消息会等待,有的话就获取执行然后销毁,是一次性的
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("message:"+message);
       }
    }
}

二、工作模式

1.生产者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String QUEUE = "testwork"; //队列名字

    public static void main(String[] args) throws Exception{
        //获取连接
        Connection connection = ConnectionUtil.getConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //声明队列,如果队列存在则什么都不做,如果队列不存在才创建
        //参数一: 队列的名字
        //参数二: 是否持久化队列,我们的队列模式是在内存中的,如果rabbit重启会丢失,如果我们设置为true 则会保存到erlng自带的数据库中,重启会重新获取
        //参数三: 是否排外,有两个作用,第一个当我们的链接关闭后是否会自动删除队列,作用二,是否私有当前队列,如果私有了,其他通道不可以访问当前队列,如果为true 一般适合一个队列消费者的时候
        //参数四: 是否自动删除
        //参数五 我们的一些其他的参数
        channel.queueDeclare(QUEUE, false, false, false, null);

        for (int i = 0; i < 20; i++){
            //发送内容
            channel.basicPublish("", QUEUE, null, ("hello world "+i).getBytes());
        }

        //关闭连接
        channel.close();
        connection.close();
    }
}

2.消费者1

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String QUEUE = "testwork"; //队列名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);

        //告诉服务器,在我没有确认当前消息完成之前,不要给我发新消息
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //当我们收到消息的时候调用
                System.out.println("消费者1 收到的消息内容是:" + new String(body));
                //确认 参数2,false为确认收到消息,true 为拒绝接收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //注册消费者,参数2 收到确认,代表我们收到消息后需要手动告诉服务器,我们收到消息了
        channel.basicConsume(QUEUE, false, consumer);
    }
}

3.消费者2

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver2 {
    private final static String QUEUE = "testwork"; //队列名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);

        //告诉服务器,在我没有确认当前消息完成之前,不要给我发新消息
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //当我们收到消息的时候调用
                System.out.println("消费者2 收到的消息内容是:" + new String(body));
                //确认 参数2,false为确认收到消息,true 为拒绝接收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //注册消费者,参数2 收到确认,代表我们收到消息后需要手动告诉服务器,我们收到消息了
        channel.basicConsume(QUEUE, false, consumer);
    }
}

三、发布订阅模式

1.生产者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字

    public static void main(String[] args) throws Exception{

       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
       //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//定义一个交换机,类型是fanout

        //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失
        channel.basicPublish(EXCHANGE_NAME, "", null, "发布订阅模式的消息".getBytes());
        channel.close();
        connection.close();
    }
}

2.消费者1

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testpubQueue1", false, false, false, null);

        //绑定队列到交换机
        channel.queueBind("testpubQueue1", EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testpubQueue1", false, consumer);
    }
}

3.消费者2

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver2 {
    private final static String EXCHANGE_NAME = "testexchange"; //定义交换机名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testpubQueue2", false, false, false, null);

        //绑定队列到交换机
        channel.queueBind("testpubQueue2", EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testpubQueue2", false, consumer);
    }
}

四、路由模式

1.生产者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字

    public static void main(String[] args) throws Exception{

       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
       //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");//定义一个路由格式的交换机

        //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失
        // routingKey 为key1
        channel.basicPublish(EXCHANGE_NAME, "key3", null, "路由模式的消息".getBytes());
        channel.close();
        connection.close();
    }
}

2.消费者1

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testRouteQueue1", false, false, false, null);

        //绑定队列到交换机
        //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到
        channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key1");
        //如果需要绑定多个标记 在执行一次即可
        channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key3");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testRouteQueue1", false, consumer);
    }
}

3.消费者2

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver2 {
    private final static String EXCHANGE_NAME = "testexroute"; //定义交换机名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testRouteQueue2", false, false, false, null);

         //绑定队列到交换机
        //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到
        channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key1");
        //如果需要绑定多个标记 在执行一次即可
        channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key2");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testRouteQueue2", false, consumer);
    }
}

五、主题模式

1.生产者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字

    public static void main(String[] args) throws Exception{

       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
       //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");//定义一个topic 格式的交换机

        //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息则会丢失
        // routingKey 为key1
        // * 只能匹配一个字符 # 可以匹配多个字符
        channel.basicPublish(EXCHANGE_NAME, "abc.1.3", null, "topic模式的消息".getBytes());
        channel.close();
        connection.close();
    }
}

2.消费者1

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testTopicQueue1", false, false, false, null);

        //绑定队列到交换机
        //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到
        channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.*");
        //如果需要绑定多个标记 在执行一次即可
        channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "abc.#");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testTopicQueue1", false, consumer);
    }
}

3.消费者2

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver2 {
    private final static String EXCHANGE_NAME = "testexchangetopic"; //定义交换机名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testTopicQueue2", false, false, false, null);

         //绑定队列到交换机
        //参数3标记,绑定到交换机的时候会指定一个标记,只有和它一样的标记的消息才会被当前消费者接收到
        channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "key.#");
        //如果需要绑定多个标记 在执行一次即可
        channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "abc.*");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testTopicQueue2", false, consumer);
    }
}
原文地址:https://www.cnblogs.com/gyli20170901/p/10137341.html