SpringBoot整合rabbitmq

使用springboot整合rabbitmq

   逻辑梳理:

        1.创建连接:ConnectionFactory  (host、port、username、password)

        2.创建队列并绑定 (queue、exchange、routingkey)

        3.创建RabbitAdmin类用来管理

        4.  生产者:设置消息发送类 RabbitTemplate 重发类 RetryTemplate    

           消费者:设置消息接收监听类,方法

        第一步:创建连接

        

		String host = StringUtils.hasText(env().getProperty(ENV_CONNECTION_HOST)) ? 
				env().getProperty(ENV_CONNECTION_HOST) : DEFAULT_CONNECTION_HOST;
		int port = NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) >= 1024 &&
				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) < 65535 ?
				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) : DEFAULT_CONNECTION_PORT;
	    String username = StringUtils.hasText(env().getProperty(ENV_CONNECTION_NAME)) ? 
				env().getProperty(ENV_CONNECTION_NAME) : DEFAULT_CONNECTION_NAME;
		String password = StringUtils.hasText(env().getProperty(ENV_CONNECTION_PWD)) ? 
				env().getProperty(ENV_CONNECTION_PWD) : DEFAULT_CONNECTION_PWD;
	    int cachesize = NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) > 5 &&
				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) < 25 ?
				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) : DEFAULT_CONNECTION_CACHESIZE;
		
		CachingConnectionFactory factory = new CachingConnectionFactory(host,port);
		factory.setUsername(username);
		factory.setPassword(password);
		
		//打印连接消息
		logger.info("******rabbit connection*******");
		logger.info("[主机地址:]"+host);
		logger.info("[端口号:]"+port);
		logger.info("[登录用户:]"+username);
		logger.info("[系统默认连接数:]"+cachesize);
		
		return factory;
	}

        

        第二步:创建队列并绑定

//创建队列,并将队列与交换机和路由键绑定
	@Bean
	public Queue queue(){
		return new Queue(StringUtils.hasText(env().getProperty(ENV_QUEUE_NAME)) ? 
				env().getProperty(ENV_QUEUE_NAME) : DEFAULT_QUEUE_NAME); 
	}
	//创建交换机,根据创建的类来指定交换机类型:TopicExchange、D
	@Bean
	public TopicExchange topicExchange(){
		return new TopicExchange(StringUtils.hasText(env().getProperty(ENV_EXCHANGE_NAME)) ? 
				env().getProperty(ENV_EXCHANGE_NAME) : DEFAULT_EXCHANGE_NAME);
	}
	//将创建好的队列和交换机通过指定routingkey绑定
	@Bean
	public Binding queueBinding(){
		return BindingBuilder.bind(queue()).
								to(topicExchange()).
								with(StringUtils.hasText(env().getProperty(ENV_ROUTINGKEY))?
										env().getProperty(ENV_ROUTINGKEY) : DEFAULT_ROUTINGKEY);
	}

      第三步:创建RabbitAdmin类用来管理

 

//管理类 
	@Bean
	public AmqpAdmin rabbitAdmin() {
		return new RabbitAdmin(connectionFactory());
	}

      第四步:1.设置生产者消息发送类和断连重连类

//设置断连重发类
	@Bean
	public RetryTemplate retryTemplate(){
		long interval = NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_INTERVAL)) >=100?
					NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_INTERVAL)) :
					DEFAULT_BACKOFF_INTERVAL;
		double multiplier =  NumberUtils.toDouble(env().getProperty(ENV_BACKOFF4ASYN_MULTIPLIER),0) >0?
			NumberUtils.toDouble(env().getProperty(ENV_BACKOFF4ASYN_MULTIPLIER)) :
			DEFAULT_BACKOFF_MULTIPLIER;
		long max_interval = NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_MAXINTERVAL),0) >1000?
				NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_MAXINTERVAL)) :
		    DEFAULT_BACKOFF_MAXINTERVAL;
		
		ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
		
		//initial-interval(>100)
		backOffPolicy.setInitialInterval(interval);
		
		backOffPolicy.setMultiplier(multiplier);
		//max-interval(>1000)
		backOffPolicy.setMaxInterval(max_interval);
		RetryTemplate retryTemplate = new RetryTemplate();
		retryTemplate.setBackOffPolicy(backOffPolicy);
		
		logger.info("*******rabbit retry-template******");		
		logger.info("[interval]{}",interval);
		logger.info("[multiplier]{}",multiplier);
		logger.info("[max_interval]{}",max_interval);
		return retryTemplate;
	}
	//将生产者发送类和断连重发类绑定在一起的方法
	private void setRetryTemplate(RabbitTemplate _template){
		if(StringUtils.hasText(env().getProperty(ENV_BACKOFF4ASYN_OPEN)) && 
		   0 == env().getProperty(ENV_BACKOFF4ASYN_OPEN).compareToIgnoreCase("true")){
		   _template.setRetryTemplate(retryTemplate());		
		   logger.info("[retry]{}",true);
		}else{
			logger.info("[retry]{}",false);
		}
	}
	//生产者操作类
	@Bean
	public RabbitTemplate rabbitTemplate(){
		RabbitTemplate template = new RabbitTemplate(connectionFactory());
		//向操作类中设置已经和队列绑定好了的交换机
		template.setExchange(topicExchange().getName());
		//设置发出去的信息转化的方式,特别注意,发送者的消息处理方式必须和接收方一致
		template.setMessageConverter(new FastJsonMessageConvert());
		//向template中设置如果断连的重发类
		setRetryTemplate(template);
		
		logger.info("*******rabbit template*****");
		logger.info("[exchange]:",topicExchange().getName());
		logger.info("[message.convert]",FastJsonMessageConvert.class.getSimpleName());
		return template;
	}
	

        2.设置消费者接收消息处理类

//设置消费者[SimpleMessageListenerContainer<--MessageListenerAdapter<--FastJsonMessageConvert]
	@Bean
	public SimpleMessageListenerContainer messageListenerContainer(){
		//设置消费者连接信息
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
		container.setQueueNames(queue().getName());
		//设置监听模板
		container.setMessageListener(messageListenerAdapter());
		//设置应答方式
		container.setAcknowledgeMode(AcknowledgeMode.NONE);
		return container;
	}
	@Bean
	 public MessageListenerAdapter messageListenerAdapter() {
		DataPackageHandler handler = new DataPackageHandler();
		//设置消费者处理信息的方式和转化信息的方式
		MessageListenerAdapter adapter = new MessageListenerAdapter(handler,new FastJsonMessageConvert());
		//指定是消息处理类中的哪一个方法
		adapter.setDefaultListenerMethod("messageHandler");
		return adapter;
	}

  

以下是完整代码:MyRabbitConfiguration类

package com.sunland.demo.configuration;

import java.util.Properties;

import org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.util.StringUtils;

import com.sunland.common.cfg.AtomRoot;
import com.sunland.common.cfg.XmlConfiguration;
/**
 * 三个工具类:
 * BindingBuilder:用来通过绑定指定的队列和交换机
 * StringUtils:用来操作字符串
 * NumberUtils:用来操作int
 * @author qiu
 *
 */
@Configuration
@EnableScheduling
public class MyRabbitConfiguration {
	private static final Logger logger = LoggerFactory.getLogger(MyRabbitConfiguration.class.getName());
	private Properties prop = new Properties();
	private String xmlConfigure = "conf/myRabbit.xml";
	private Properties env(){
		if(prop.size() <= 0){
			AtomRoot root = XmlConfiguration.getConfiguration(xmlConfigure);
			if(root != null){
				prop.putAll(XmlConfiguration.getAllProperties(root));
			}
		}
		return prop;
	}
	
	//[rabbit设置]
	//[connection设置]
	private static String ENV_CONNECTION_HOST = "mq.host";
	private static String ENV_CONNECTION_PORT = "mq.port";
	private static String ENV_CONNECTION_NAME = "mq.name";
	private static String ENV_CONNECTION_PWD = "mq.password";
	private static String ENV_CONNECTION_CACHESIZE = "mq.cachesize";
	//[默认连接设置]
	private static String DEFAULT_CONNECTION_HOST = "localhost";
	private static Integer DEFAULT_CONNECTION_PORT = 5672;
	private static String DEFAULT_CONNECTION_NAME = "qiuhangxiang";
	private static String DEFAULT_CONNECTION_PWD = "qiu1994825";
	private static Integer DEFAULT_CONNECTION_CACHESIZE = 5;
	//	
	private Integer MAX_CACHESIZE = 25;
	
	//[生产者发送消息交换机、队列和路由键设置]
	private static String ENV_EXCHANGE_NAME = "mq.exchange";
	private static String ENV_QUEUE_NAME = "mq.queue";
	private static String ENV_ROUTINGKEY = "mq.routingkey";
	//[生产者发送消息默认交换机、队列和路由键设置]
	private static String DEFAULT_EXCHANGE_NAME = "i.am.default.exchange";
	private static String DEFAULT_QUEUE_NAME = "i.am.default.queue";
	private static String DEFAULT_ROUTINGKEY = "i.am.default.routingkey";
	//[消费者默认交换机和队列]
	private static String DEFAULT_CUSTOMER_QUEUE_NAME = "i.am.default.customer.queue";
	private static String DEFAULT_CUSTOMER_ROUTINGKEY = "i.am.default.customer.routingkey";
	
	//异步报文发送重试模板
	public  static String ENV_BACKOFF4ASYN_OPEN = "mq.retry";
	public static String ENV_BACKOFF4ASYN_INTERVAL = "mq.retry.interval";
	public static String ENV_BACKOFF4ASYN_MULTIPLIER = "mq.retry.multiplier";
	public static String ENV_BACKOFF4ASYN_MAXINTERVAL = "mq.retry.maxinterval";
	public static long DEFAULT_BACKOFF_INTERVAL = 500;
	public static double DEFAULT_BACKOFF_MULTIPLIER = 10.0;
	public static long DEFAULT_BACKOFF_MAXINTERVAL = 10000;
	
	@Bean
	public ConnectionFactory connectionFactory(){
		
		String host = StringUtils.hasText(env().getProperty(ENV_CONNECTION_HOST)) ? 
				env().getProperty(ENV_CONNECTION_HOST) : DEFAULT_CONNECTION_HOST;
		int port = NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) >= 1024 &&
				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) < 65535 ?
				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) : DEFAULT_CONNECTION_PORT;
	    String username = StringUtils.hasText(env().getProperty(ENV_CONNECTION_NAME)) ? 
				env().getProperty(ENV_CONNECTION_NAME) : DEFAULT_CONNECTION_NAME;
		String password = StringUtils.hasText(env().getProperty(ENV_CONNECTION_PWD)) ? 
				env().getProperty(ENV_CONNECTION_PWD) : DEFAULT_CONNECTION_PWD;
	    int cachesize = NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) > 5 &&
				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) < 25 ?
				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) : DEFAULT_CONNECTION_CACHESIZE;
		
		CachingConnectionFactory factory = new CachingConnectionFactory(host,port);
		factory.setUsername(username);
		factory.setPassword(password);
		
		//打印连接消息
		logger.info("******rabbit connection*******");
		logger.info("[主机地址:]"+host);
		logger.info("[端口号:]"+port);
		logger.info("[登录用户:]"+username);
		logger.info("[系统默认连接数:]"+cachesize);
		
		return factory;
	}
	//创建队列,并将队列与交换机和路由键绑定
	@Bean
	public Queue queue(){
		return new Queue(StringUtils.hasText(env().getProperty(ENV_QUEUE_NAME)) ? 
				env().getProperty(ENV_QUEUE_NAME) : DEFAULT_QUEUE_NAME); 
	}
	//创建交换机,根据创建的类来指定交换机类型:TopicExchange、D
	@Bean
	public TopicExchange topicExchange(){
		return new TopicExchange(StringUtils.hasText(env().getProperty(ENV_EXCHANGE_NAME)) ? 
				env().getProperty(ENV_EXCHANGE_NAME) : DEFAULT_EXCHANGE_NAME);
	}
	//将创建好的队列和交换机通过指定routingkey绑定
	@Bean
	public Binding queueBinding(){
		return BindingBuilder.bind(queue()).
								to(topicExchange()).
								with(StringUtils.hasText(env().getProperty(ENV_ROUTINGKEY))?
										env().getProperty(ENV_ROUTINGKEY) : DEFAULT_ROUTINGKEY);
	}
	//设置断连重发类
	@Bean
	public RetryTemplate retryTemplate(){
		long interval = NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_INTERVAL)) >=100?
					NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_INTERVAL)) :
					DEFAULT_BACKOFF_INTERVAL;
		double multiplier =  NumberUtils.toDouble(env().getProperty(ENV_BACKOFF4ASYN_MULTIPLIER),0) >0?
			NumberUtils.toDouble(env().getProperty(ENV_BACKOFF4ASYN_MULTIPLIER)) :
			DEFAULT_BACKOFF_MULTIPLIER;
		long max_interval = NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_MAXINTERVAL),0) >1000?
				NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_MAXINTERVAL)) :
		    DEFAULT_BACKOFF_MAXINTERVAL;
		
		ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
		
		//initial-interval(>100)
		backOffPolicy.setInitialInterval(interval);
		
		backOffPolicy.setMultiplier(multiplier);
		//max-interval(>1000)
		backOffPolicy.setMaxInterval(max_interval);
		RetryTemplate retryTemplate = new RetryTemplate();
		retryTemplate.setBackOffPolicy(backOffPolicy);
		
		logger.info("*******rabbit retry-template******");		
		logger.info("[interval]{}",interval);
		logger.info("[multiplier]{}",multiplier);
		logger.info("[max_interval]{}",max_interval);
		return retryTemplate;
	}
	//将生产者发送类和断连重发类绑定在一起的方法
	private void setRetryTemplate(RabbitTemplate _template){
		if(StringUtils.hasText(env().getProperty(ENV_BACKOFF4ASYN_OPEN)) && 
		   0 == env().getProperty(ENV_BACKOFF4ASYN_OPEN).compareToIgnoreCase("true")){
		   _template.setRetryTemplate(retryTemplate());		
		   logger.info("[retry]{}",true);
		}else{
			logger.info("[retry]{}",false);
		}
	}
	//生产者操作类
	@Bean
	public RabbitTemplate rabbitTemplate(){
		RabbitTemplate template = new RabbitTemplate(connectionFactory());
		//向操作类中设置已经和队列绑定好了的交换机
		template.setExchange(topicExchange().getName());
		//设置发出去的信息转化的方式,特别注意,发送者的消息处理方式必须和接收方一致
		template.setMessageConverter(new FastJsonMessageConvert());
		//向template中设置如果断连的重发类
		setRetryTemplate(template);
		
		logger.info("*******rabbit template*****");
		logger.info("[exchange]:",topicExchange().getName());
		logger.info("[message.convert]",FastJsonMessageConvert.class.getSimpleName());
		return template;
	}
	
	//管理类 
	@Bean
	public AmqpAdmin rabbitAdmin() {
		return new RabbitAdmin(connectionFactory());
	}
	
	//设置消费者[SimpleMessageListenerContainer<--MessageListenerAdapter<--FastJsonMessageConvert]
	@Bean
	public SimpleMessageListenerContainer messageListenerContainer(){
		//设置消费者连接信息
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
		container.setQueueNames(queue().getName());
		//设置监听模板
		container.setMessageListener(messageListenerAdapter());
		//设置应答方式
		container.setAcknowledgeMode(AcknowledgeMode.NONE);
		return container;
	}
	@Bean
	 public MessageListenerAdapter messageListenerAdapter() {
		DataPackageHandler handler = new DataPackageHandler();
		//设置消费者处理信息的方式和转化信息的方式
		MessageListenerAdapter adapter = new MessageListenerAdapter(handler,new FastJsonMessageConvert());
		//指定是消息处理类中的哪一个方法
		adapter.setDefaultListenerMethod("messageHandler");
		return adapter;
	}
}

  DataPackageHandler处理消息类

package com.sunland.demo.configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.localhost.demo.PojoTest;

public class DataPackageHandler {
	private static Logger logger = LoggerFactory.getLogger(DataPackageHandler.class.getName());
	
	public void messageHandler(PojoTest obj){
		try{
			System.out.println("DataPackageHandler"+obj.getAge()+":"+obj.getName());
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
}

  

生产者发送消息类

package com.sunland.demo.configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitGatewaySupport;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;


public class AsyncGateway extends RabbitGatewaySupport {
	private Logger log = LoggerFactory.getLogger(AsyncGateway.class);
	

	public void send(final Object obj){
		try {
			String routingKey;
//			if(obj instanceof IllegalVehicle){
//				routingKey = "queue.itaxi.illegal";
//			}
//			//[dispatch]
//			else 
				routingKey = "mq_routingkey";
//				rabbitTemplate.convertAndSend(routingKey, obj);
				this.getRabbitTemplate().convertAndSend(routingKey, obj,
				new MessagePostProcessor() {
					public Message postProcessMessage(Message message)
							throws AmqpException {
						//
						// ---set correlation data so that the replies can
						// be correlated to the request
						message.getMessageProperties().setType(obj.getClass().getName());
						return message;
					}
				});
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

  一个基本的spring整合rabbitmq的例子完成

       

      

      

原文地址:https://www.cnblogs.com/qiuhx/p/6425066.html