RabbitMQ 部分API解析

/**
 * exchange: 交换器的名称
 * type:交换器的类型,如Direct Topic Headers Fanout
 *     Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
 *     Fanout Exchange – 不处理路由键。消息都会被转发到与该交换机绑定的所有队列上。(Fanout交换机转发消息是最快的)。
 *     Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
 *     因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
 * durable:设置是否持久化,true持久化,以保证服务器重启,不会丢失相关信息
 * autoDelete:是否自动删除。true为自动删除,删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑(并不是当与此交换器连接的客户端都断开时自动删除)
 * internal:是否内置,true表示内置交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
 * argument:其它一些结构化参数
 */  
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, false, null);

/**
 * 指定队列
 * queue: 队列名称
 * durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,
 *     保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
 * exclusive:排他队列,
 *     如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
 *     其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。
 *     其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
 *     其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
 *     这种队列适用于只限于一个客户端发送读取消息的应用场景。
 * autoDelete:自动删除,true为自动删除,删除的前提是至少有一个消费者连接这个队列,之后所有与这个队列连接的消费者都断开时都会自动删除(并不是当连接此队列的所有客户端都断开时自动删除)
 * arguments:x-message-ttl(消息过期时间)、
 *             x-max-length(最大积压消息个数)、
 *             x-dead-letter-exchange(消息过期后投递的exchange)
 *             x-dead-letter-routing-key(消息过期后按照指定的routingkey重新发送)、
 *             x-max-priority(队列优先级,值越大优先级超高,优先级高的消息具备优先被消费的特权)
 *            x-expires(控制队列如果在多长时间未使用则会被删除,毫秒为单位)、
 *            x-max-length-bytes
 */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

 

/**
 * queue:队列名称
 * exchange:交换器的名称
 * routingKey:用来绑定队列和交换器的路由键
 * argument:定义绑定的一些参数
 */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);

/**
 * 和queueBind用法相似。
 * 绑定后,消息从source交换器转发到destination交换器
 */
channel.exchangeBind("destination", "source", "routingKey", null);


/**
 * exchange:交换器名称,如果设置为空字符串,则消息会被发送到RabbitMQ默认的交换器中。
 * routingKey:指定路由键,交换器根据路由键将消息存储到相应的队列之中
 * mandatory:为true则当exchange找不到相应的queue时,会调用basic.return方法将消息返还给生产者,否则丢弃
 * immediate:为true则当exchange将消息route到所有queue(s)发现没有consumer时,不会将消息插入队列,会调用basic.return方法将消息返还给生产者
 * props:消息为持久化  —— MessageProperties.PERSISTENT_TEXT_PLAIN
 * body:msg字节
 */
channel.basicPublish(ex_log, "", true, true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

String CorrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties.Builder builder1 = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties1 = builder1
        .contentType("text/plain")    //标识消息内容的MIME
        .contentEncoding("utf-8")
        .headers(new HashMap())
        .deliveryMode(2)    //消息持久态(2)或瞬态(任何其他值)
        .priority(1)    //消息优先级
        .correlationId(CorrId)    //用来关联请求(request)和其调用RPC之后的回复(response)  (如rpc客户端根据id进行消息确认)
        .replyTo(replyQueueName)    //指定消息响应队列(rpc)
        .expiration("10000")    //消息延迟
        .messageId("33333333333")
        .timestamp(new Date())
        .type("4444444444444")
        .userId("5555555555555")
        .appId("66666666666")
        .clusterId("77777777777777")
        .build();

/**
 * 生产者获取没有被正确路由到合适队列的消息,通过添加ReturnListener来实现
 */
channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
            throws IOException {
        System.out.println("返回的结果是:"+new String(arg5));
    }
});

/**
 * 确认机制
 */
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Nack,SeqNo:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
    }
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("deliveryTag:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
        System.out.println("5");
        //注意这里需要添加处理消息重发的场景
    }
});


/**
 * 公平转发,设置客户端最多接收未被ack的消息的个数,只有在消费者空闲的时候会发送下一条信息,同一时间每次发给一个消息给一个worker。
 * 一个生产者与多个消费者时,避免RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。
 */
channel.basicQos(prefetchCount);

/**
 * 实现消费者
 */
Consumer consumer = new DefaultConsumer(channel) {
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
        System.out.println("recv msg:"+new String(body));
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /**
         * 发送应答,DeliveryTag可以看作消息的编号,是一个64位长整型值
         */
        channel.basicAck(envelope.getDeliveryTag(), false);
        /**
         * 单个拒绝,DeliveryTag可以看作消息的编号,requeue为true时,交换器将重新发送消息
         */
        channel.basicReject(envelope.getDeliveryTag(), true);
        /**
         * 批量拒绝,
         */
        channel.basicNack(envelope.getDeliveryTag(), false, true);
    }
};

/**
 * 为队列指定消费者
 * queue: 队列的名称
 * autoAck:
 *   true:RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或磁盘)中删除
 *   false:RabbitMQ会等待消费者地回复确认信号后才从内存(或者磁盘)中移去消息(实际上是先打上删除标记,之后再删除)
 *   如果一直没有收到消费者的确认信号,并且消费此消息的消费者已断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,也有可能还是原来那个消费者
 * consumerTag:消费者标签,用来区分多个消费者
 * noLocal:设置为true不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
 * exclusive:是否排它
 * arguments:设置消费者其他参数
 * callback:消费者的回调函数,用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要重写其中的handleDelivery方法
 */  
channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback);


/**
 * 发送应答
 */
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

/**
 * 指定队列
 * queue: 队列名称
 * durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,
 *     保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
 * exclusive:排他队列,
 *     如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
 *     其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。
 *     其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
 *     其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
 *     这种队列适用于只限于一个客户端发送读取消息的应用场景。
 * autoDelete:自动删除,true为自动删除,删除的前提是至少有一个消费者连接这个队列,之后所有与这个队列连接的消费者都断开时都会自动删除(并不是当连接此队列的所有客户端都断开时自动删除)
 * arguments:x-message-ttl(消息过期时间)、
 *             x-max-length(最大积压消息个数)、
 *             x-dead-letter-exchange(消息过期后投递的exchange)
 *             x-dead-letter-routing-key(消息过期后按照指定的routingkey重新发送)、
 *             x-max-priority(队列优先级,值越大优先级超高)
 *            x-expires(控制队列被自动删除处于未使用状态的时间)、
 *            x-max-length-bytes
 */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
                    throws IOException {
                System.out.println("返回的结果是:"+new String(arg5));
            }
        });

原文地址:https://www.cnblogs.com/yifanSJ/p/8986581.html