SpringBoot整合ActiveMQ

目录结构

引入 maven依赖

  <parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.4.RELEASE</version>
		<relativePath/> 
	</parent>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

引入 application.yml配置

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
queue: springboot-queue
server:
  port: 8080

创建QueueConfig

@Configuration
public class QueueConfig {
	@Value("${queue}")
	private String queue;

	@Bean
	public Queue logQueue() {
		return new ActiveMQQueue(queue);
	}

	@Bean
	public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) {
		JmsTemplate jmsTemplate = new JmsTemplate();
		jmsTemplate.setDeliveryMode(2);// 进行持久化配置 1表示非持久化,2表示持久化</span>
		jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
		jmsTemplate.setDefaultDestination(queue); // 此处可不设置默认,在发送消息时也可设置队列
		jmsTemplate.setSessionAcknowledgeMode(4);// 客户端签收模式</span>
		return jmsTemplate;
	}

	// 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
	@Bean(name = "jmsQueueListener")
	public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(
			ActiveMQConnectionFactory activeMQConnectionFactory) {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(activeMQConnectionFactory);
		// 设置连接数
		factory.setConcurrency("1-10");
		// 重连间隔时间
		factory.setRecoveryInterval(1000L);
		factory.setSessionAcknowledgeMode(4);
		return factory;
	}

}

创建生产者:

@SpringBootApplication
@Component
@EnableScheduling
public class Producer {
	
	@Autowired
	private JmsMessagingTemplate jmsMessagingTemplate;
	
	@Autowired
	private Queue queue;
	
	@Scheduled(fixedDelay=3000)
	public void send() {
		String result = System.currentTimeMillis()+"---测试";
		System.out.println("result"+result);
		jmsMessagingTemplate.convertAndSend(queue,result);
	}
	public static void main(String[] args) {
		SpringApplication.run(Producer.class, args);
	}
}

创建消费者的application.yml

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
queue: springboot-queue
server:
  port: 8081

创建消费者:

@Component
@SpringBootApplication
public class consumer {

	private int count =0;
	
	@JmsListener(destination = "${queue}")
	public void receive(TextMessage textMessage,Session session) throws JMSException {
		String text = textMessage.getText();
		
		System.out.println("消费:"+text+"第几次获取消息count:"+(++count));
		
		System.out.println();
		String jmsMessageID = textMessage.getJMSMessageID();
	}
	
	public static void main(String[] args) {
		SpringApplication.run(consumer.class,args);
	}
}

结果显示:

原文地址:https://www.cnblogs.com/Libbo/p/11547852.html