RabbitMQ安装与消息模型

RabbitMQ安装与消息模型

通过docker安装rabbitmq

#拉起镜像-management带控制台
docker pull rabbitmq:management
#运行容器
docker run -d --hostname "主机名" --name mq1 -e RABBITMQ_DEFAULT_USER="账号" -e RABBITMQ_DEFAULT_PASS="密码" -p 15672:15672 -p 5672:5672 rabbitmq:management
#进入容器
docker exec -it mq1 bash
#查看服务状态
rabbitmqctl status
#查看插件列表
rabbitmq-plugins list
#测试访问
curl localhost:15672

消息模型

依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>

WEB操作

新建一个虚拟主机

新建一个用户,并点击用户名

设置用户的权限,允许其访问ems主机

封装工具类

private static ConnectionFactory connectionFactory;

static {
    connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("122.51.70.176");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/ems");
    connectionFactory.setUsername("ems");
    connectionFactory.setPassword("123456");
}

public static Connection getConnection() {
    try {
        return connectionFactory.newConnection();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}

public static void close(Channel channel, Connection connection) {
    try {
        if (channel != null) {
            channel.close();
        }
        if (connection != null) {
            connection.close();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

直连模型

生产者发送消息到队列, 消费者从中取出消息.

生产者
@Test
public void helloWorld() throws IOException{
    Connection connection = MQUtils.getConnection();
    //建立通道
    Channel channel = connection.createChannel();
    //通道绑定消息队列, 队列名hello, 队列不持久化, 非独占, 不自动删除, 附加参数
    channel.queueDeclare("hello", false, false, false, null);
    //发布消息, 不指定交换机, 队列名, 传递消息的额外设置, 消息内容
    channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes());
    //关闭通道
    MQUtils.close(channel, connection);
}
消费者
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);
    //消费消息, 队列名, 消息自动确认, 消费的回调接口
    channel.basicConsume("hello", true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //打印获得的消息
            System.out.println(new String(body));
        }
    });
}

工作队列模型(work queues)

让多个消费者绑定到一个队列, 共同消费队列中的消息, 该模型下默认消费者获取消息的方式是轮询. 消费者消费的信息数量是平均的.

生产者
@Test
public void workQueues() throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work", true, false, false, null);
    for (int i = 0; i < 50; i++) {
        channel.basicPublish("", "work", null, ("hello work queues" + i).getBytes());
    }
    MQUtils.close(channel, connection);
}
消费者
//消费者1
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work", true, false, false, null);
    channel.basicConsume("work", true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
            System.out.println("消费者1" + new String(body));
        }
    });
}
//消费者2
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work", true, false, false, null);
    channel.basicConsume("work", true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
            System.out.println("消费者2" + new String(body));
            try {
                //慢处理
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

两个消费者获得的消息数量都是25条, 消费者1很快就完成了任务, 但是消费者2加班了很久. 需要优化改进,让消费者能者多劳.

改进后的消费者
//消费者1
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    final Channel channel = connection.createChannel();
    //设通道上的消息容量为1
    channel.basicQos(1);
    channel.queueDeclare("work", true, false, false, null);
    //把自动确认设置为false
    channel.basicConsume("work", false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消费者1" + new String(body));
            //手动确认消息, 不开启多条消息确认
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    });
}
//消费者2
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    final Channel channel = connection.createChannel();
    channel.basicQos(1);
    channel.queueDeclare("work", true, false, false, null);
    channel.basicConsume("work", false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消费者2" + new String(body));
            channel.basicAck(envelope.getDeliveryTag(), false);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

广播模型(发布-订阅模型)

可以有多个消费者, 每个消费者都有自己的消费队列, 队列绑定到交换机上, 交换机从生产者接收消息. 最终可以实现一条消息被多个消费者消费.

生产者
@Test
public void fanout() throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //指定交换机, 交换机名为news, 交换机类型为fanout
    channel.exchangeDeclare("news","fanout");
    for (int i = 0; i < 10; i++) {
        //消息发布, 选择交换机, 无路由key, 无其他设置, 消息内容
        channel.basicPublish("news","",null, ("hello fanout" + i).getBytes());
    }
    MQUtils.close(channel, connection);
}
n个消费者
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //绑定队列
    channel.queueBind(queue, "news", "");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消费者n" + new String(body));
        }
    });
}

直连模型(静态路由模型)

在路由的直连(Direct)模式下, 队列与交换机之间不再是任意绑定了, 而是要指定一个路由Key, 消息的发送也必须要指定上路由Key, 交换机根据对消息路由Key进行判断, 只有相匹配的情况下消费者才会接收到消息.

生产者
@Test
public void direct() throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //指定交换机, 交换机名为mail, 交换机类型为direct
    channel.exchangeDeclare("mail", "direct");
    for (int i = 0; i < 10; i++) {
        //消息发布, 选择交换机, 无路由key, 无其他设置, 消息内容
        if (i == 4 | i == 8) {
            channel.basicPublish("mail", "vip", null, ("vip消息" + i).getBytes());
            continue;
        }
        channel.basicPublish("mail", "user", null, ("用户消息" + i).getBytes());
    }
    MQUtils.close(channel, connection);
}
消费者
//消费者1-普通用户
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //绑定队列
    channel.queueBind(queue, "mail", "user");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消费者1" + new String(body));
        }
    });
}
//消费者1-vip用户
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //绑定多个队列
    channel.queueBind(queue, "mail", "user");
    channel.queueBind(queue, "mail", "vip");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消费者2" + new String(body));
        }
    });
}

订阅模型(动态路由模型)

在路由的订阅(Topic)模式下, 也是使用路由Key来分配消息的, 不过在绑定路由Key的时候可以使用通配符, 一般路由Key可以由一到多个单词组成, 单词之间使用"."进行分割.

发布者
@Test
public void Topic() throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //指定交换机, 交换机名为notice, 交换机类型为topic
    channel.exchangeDeclare("notice", "topic");
    for (int i = 0; i < 10; i++) {
        //消息发布, 选择交换机, 无路由key, 无其他设置, 消息内容
        if (i == 4) {
            channel.basicPublish("notice", "user.vip.msg", null, ("vip消息" + i).getBytes());
            continue;
        }else if (i == 8) {
            channel.basicPublish("notice", "user.vip.present", null, ("vip礼物" + i).getBytes());
            continue;
        }
        channel.basicPublish("notice", "user.msg", null, ("用户消息" + i).getBytes());
    }
    MQUtils.close(channel, connection);
}
消费者
//消费者1
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //绑定队列, *任意一个字词, #任意数量字词
    channel.queueBind(queue, "notice", "user.*");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消费者1" + new String(body));
        }
    });
}
//消费者2
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //绑定队列, *任意一个字词, #任意数量字词
    channel.queueBind(queue, "notice", "user.#");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消费者2" + new String(body));
        }
    });
}
原文地址:https://www.cnblogs.com/pinked/p/13697056.html