rabbitmq学习记录

rabbitmq学习记录

作用

消息队列,实现解耦。实现业务系统间的通信。

搭建过程

在之前搭好的服务器上按照ubuntu18.04安装rabbitmq的方式安装即可。安装成功后再ip:端口可以直接访问图形界面。java这边在maven里把依赖加上就ok了。

学习过程

maven依赖

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies> 

简单程序实现信息传递

生产者

public class Producer {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂ip
        factory.setHost("202.200.231.14");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 1.队列名称
         * 2.消息是否持久化
         * 3.是否能有多个消费者
         * 4.是否自动删除
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "hello world";

        channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息发送成功");
    }
} 

消费者

public class Consumer {
    //队列的名称
    public static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws  Exception{
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("202.200.231.14");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback = (consumerTag,message) ->{
            System.out.println(new String(message.getBody()));
        };

        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println("消息消费被中断");
        };
        /**
         * 1.消费哪个队列
         * 2.消费成功后是否需要自动应答
         * 3.消费者未成功消费的回调
         * 4.消费者取消消费的的回调
         */

        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

工作队列模式

轮询,多个消费者线程,处理信息。直接多开消费者,可以轮流接收消息。

抽取工具类 rabbitimqutils

public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂ip
        factory.setHost("202.200.231.14");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");

        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        return channel;
    }
}

消息应答手动版

/**
 * 消息再手动应答时不丢失,重新放回到队列
 */
public class Worker3 {
    public static final String task_queue_name ="ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1很快");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            //沉睡1s
            SleepUtils.sleep(1);
            System.out.println("接收到的消息" + new String(message.getBody()));
            //手动应答
            /**
             * 1. 消息的标记tag
             * 2. 是否批量应答消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(""+"拔剑四顾心茫然");
        };

        channel.basicConsume(task_queue_name,false,deliverCallback,cancelCallback);

    }
}
原文地址:https://www.cnblogs.com/wenwenjiejie/p/15110323.html