RabbitMq初体验

简介:

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

组成

  • 生产者
  • 消息队列
  • 消费者
  • 交换机:
    • 隔离生产者和消息队列,充当二者中间体。
    • 接受相应的消息并绑定到指定队列

发布模式

根据交换机类型不同,分为3种:

  1. Direct<直接>:1对1-----一个消息只能被一个消费者消费
  2. Topic<主题>:1对多-----一个消息可以被多个消费者消费
  3. Fanout<分列>:广播
注:一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中

整合springboot

pom

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

yml

spring:
  application:
    name: spring-boot-amqp
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    port: 5672
    username: rabbit
    password: 123456

config

API:BindingBuilder.bind(指定队列).to(交换机).with(路由键);
package com.hxtec.polaris.configure;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * API:BindingBuilder.bind(指定队列).to(交换机).with(路由键);
 */
@Configuration
public class MyAMQPConfig {
    final static String tpm = "topic.message";
    final static String tp1m = "topic1.message";
    final static String tpm1 = "topic.message1";
    final static String tpm2 = "topic.message2";

    @Bean
    public Queue tpm() {
        return new Queue(MyAMQPConfig.tpm);
    }

    @Bean
    public Queue tp1m() {
        return new Queue(MyAMQPConfig.tp1m);
    }

    @Bean
    public Queue tpm1() {
        return new Queue(MyAMQPConfig.tpm1);
    }

    @Bean
    public Queue tpm2() {
        return new Queue(MyAMQPConfig.tpm2);
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topic.exchange");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange");
    }

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("direct.exchange");
    }

    /**
     * topic单播,只给topic.message发送
     * @param tpm
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingTopicExchangeMessage(Queue tpm, TopicExchange topicExchange) {
        return BindingBuilder.bind(tpm).to(topicExchange).with("topic.message");
    }

    /**
     * Fanout广播,给绑定fanoutExchange的queues全部发送
     * @param tpm
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingFanoutExchangeMessage(Queue tpm, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(tpm).to(fanoutExchange);
    }

    /**
     * 绑定规则
     * @param tpm
     * @param directExchange
     * @return
     */
    @Bean
    Binding bindingDirectExchangeMessage(Queue tpm,DirectExchange directExchange) {
        return BindingBuilder.bind(tpm).to(directExchange).with("topic.message#");
    }
}

sender

API:amqpTemplate.convertAndSend("交换机名",“路由键”,“消息内容”)
package com.hxtec.polaris.ampq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * API:amqpTemplate.convertAndSend("交换机名",“路由键”,“消息内容”)
 */
@Component
public class RabbitHelloSender {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void send() {
		String context = "hello " + new Date();
		System.out.println("Sender : " + context);
		this.rabbitTemplate.convertAndSend("topic.exchange","tpm", context);
	}

}

receiver

@RabbitListener(queues = "direct"):监听器监听指定队列
package com.hxtec.polaris.ampq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @RabbitListener(queues = "direct"):监听器监听指定队列
 */
@Component
@RabbitListener(queues = "topic.message")
public class RabbitHelloReceiver {

	@RabbitHandler
	public void process(String hello) {
		System.out.println("Receiver  : " + hello);
	}
}
原文地址:https://www.cnblogs.com/faramita/p/12779502.html