rabbitMQ 学习

这两天看了《RabbitMQ实战》这本书,也在网上看了其他人的一些博客描述,所以想写些自己所学会的东西

RabbitMQ是erlang语言写的,遵守AMQP协议,我们通过其来降低代码之间的耦合程度,当一个producer完成后,可以同时发送给多个consumer,这样可以不用通过代码

来进行调用,而是通过订阅来实现类多线程操作。

顺序是:producer(生产者) --》 exchange(交换机) --》 channel(信道) --》 queue(队列) --》 consumer(消费者)

exchange和queue之间使用binding按照路由规则绑定(路由键)

exchange、channel和queue都是存在于 消息代理服务器(我是用VM中的CentOS搭建的,安装过程有时间会去写下)

一、Python

hello_world_producer.py

//pika是RabbitMQ团队编写的官方Python AMQP库 import pika,sys //指定账户,这里guest是默认账户 credentials = pika.PlainCredentials("guest","guest") //建立到代理服务器的连接 conn_params = pika.ConnectionParameters("localhost",credentials = credentials) conn_broker = pika.BlockingConnection(conn_params) //获得信道 channel = conn_broker.channel() //声明交换器 //这里刚开始总是提示有错误,将exchange_declare()里面参数不带变量名才可以运行 //[root@localhost Desktop]# python ./hello_world_consumer.py // File "./hello_world_consumer.py", line 10 // channel.exchange_declare(exchange="hello-exchange","direct",False // SyntaxError: non-keyword arg after keyword arg channel.exchange_declare("hello-exchange", 'direct', False, True, False) //创建纯文本消息 msg = sys.argv[1] msg_props = pika.BasicProperties() msg_props.content_type = "text/plain" //发布消息 channel.basic_publish(body=msg, exchange="hello-exchange", properties=msg_props, routing_key="hola")
hello_world_consumer.py

import pika

//建立到代理服务器的连接
credentials = pika.PlainCredentials("guest","guest")
conn_params = pika.ConnectionParameters("localhost",credentials = credentials)
conn_broker = pika.BlockingConnection(conn_params)
//获取信道
channel = conn_broker.channel()
//创建交换器
channel.exchange_declare("hello-exchange","direct",False
                         ,True,False)
//声明队列
channel.queue_declare(queue="hello-queue")
//通过键“hola”将队列和交换器绑定起来
channel.queue_bind(queue="hello-queue",
            exchange="hello-exchange",
            routing_key="hola")
//用于处理传入消息的函数
def msg_consumer(channel,method,header,body):
    //消息确认
    channel.basic_ack(delivery_tag=method.delivery_tag)
    //停止消费并退出,但是不知道为什么退出不了,很怪
    if body == "quit":
        channel.basic_cancel(consumer_tag="hello-consumer")
        channel.stop_consuming()
    else:
        print body
//订阅消费者
channel.basic_consume(msg_consumer,
            queue="hello-queue",
            consumer_tag='hello-consumer')
//开始消费
channel.start_consuming()

可以完成通信Python(没学过,所有有些地方看的不是很明白)

二、JAVA(不与spring整合的,但只利用了队列,没有使用交换器)

1.导入相关jar包(我只导入两个):amqp-client和commons-lang3

2.写一个获取connection的工具类

public class ConnectionUtil {
    
    public static Connection getConnection() throws Exception{
        //定义连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置服务地址
        connectionFactory.setHost("localhost");
        //端口  
        //Web端口是15672,这里是5672 否则会报java.util.concurrent.TimeoutException
        connectionFactory.setPort(5672);
        //设置账号信息,用户名,密码,vhost
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //通过工程获取链接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }

}

3.producer

public class Producer {

    private final static String QUEUE_NAME = "test_queue";//队列名称
    
    public static void main(String[] args) throws Exception {
        //获取到链接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * @param queue the name of the queue  【队列名称】 
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)  【持久化,这里特别说一下,如果你想消息是持久化的,必须消息是持久化的,交换器也是持久化的,队列更是持久化的,其中一个不是也无法恢复消息】
         * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)  【私有的,独有的。 这个队列之后这个应用可以消费,上面的英文注释是 说restricted to this connection  就是限制在这个连接可以消费,就是说不限制channel信道咯,具体没有试过,但是应该是这样,除非备注骗我,我读得书少,你唔好呃我!!!】 
         * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【没有人使用自动删除】 注意:如果exclusive为true 最好 autodelete都为true 至于为什么 这么简单自己想~
         * @param arguments other properties (construction arguments) for the queue 【其他参数没有玩过】
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义发送的消息
        String message = "hello world!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("send "+message+".");
        //关闭信道和链接
        channel.close();
        connection.close();
    }
    
}

4.consumer

public class Consumer {
    
    private final static String QUEUE_NAME = "test_queue";
    
    public static void main(String[] args) throws Exception {
        //获取连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);//true自动模式  
        //获取消息
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("received "+message+".");
        }
        
    }

}

三、JAVA(与spring整合)

1.先导入整合jar包:spring-rabbit

2.整合最主要的就是application-rabbitmq.xml的配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
       http://www.springframework.org/schema/context 
       http://www.springframework.org/schema/context/spring-context-4.0.xsd
       http://www.springframework.org/schema/aop
       http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">

    <!-- 连接工厂 -->
    <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory -->
    <rabbit:connection-factory id="rabbitConnectionFactory"
                                 host="${amqp.host}"
                                 port="${amqp.port}"
                                 virtual-host="${amqp.vhost}"
                                 username="${amqp.username}"
                                 password="${amqp.password}"/>
                                 
      <!-- 
    RabbitAdmin会自动在RabbitMQ上创建交换器、队列、绑定
    --> <!-- org.springframework.amqp.rabbit.core.RabbitAdmin --> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!-- 消息队列,消息的栖所 --> <!-- org.springframework.amqp.core.Queue --> <rabbit:queue id="converterQueue" name="converterQueue" durable="true" auto-delete="false" exclusive="false" auto-declare="true"/> <!-- 创建交换器,并绑定对应队列 org.springframework.amqp.core.DirectExchange org.springframework.amqp.core.Binding --> <rabbit:direct-exchange id="converterExchange" name="converterExchange" durable="true"> <!-- 绑定 --> <rabbit:bindings> <!-- converterQueueKey,将队列绑定到交换器的路由KEY --> <rabbit:binding queue="converterQueue" key="converterQueueKey" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 创建RabbitTemplate对象,用于发送消息 --> <!-- org.springframework.amqp.rabbit.core.RabbitTemplate --> <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" encoding="utf-8" exchange="converterExchange" routing-key="converterQueueKey" message-converter="jackson2JsonMessageConverter"/> <bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> </beans>

3.接下来使用spring的IOC技术

  //xml文件中使用<rabit:template>标签来对这里进行注入
  @Autowired
private AmqpTemplate amqpTemplate; public void send(Object message) { amqpTemplate.convertAndSend(message); }

这样的就可以使用了

原文地址:https://www.cnblogs.com/kongkongFabian/p/7527421.html