ActiveMQ_4SpringBoot整合

SpringBoot实现

引入jar包

<dependency>

       <groupId>org.springframework.boot</groupId>

       <artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

配置application.properties

spring.activemq.broker-url=tcp://192.168.114.129:61616

spring.activemq.in-memory=true

spring.activemq.enabled=false

spring.jms.pub-sub-domain=true

创建activemq配置文件类

@EnableJms

@Configuration

public class ActiveMQConfig {

    @Bean

    public Queue queue(){

       return new ActiveMQQueue("queue1");

    }

   

    @Bean

    public RedeliveryPolicy redeliveryPolicy(){

       RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

       //是否在每次尝试重新发送失败后,增长这个等待时间

       redeliveryPolicy.setUseExponentialBackOff(true);

       //重发次数,默认为6次   这里设置为10次

       redeliveryPolicy.setMaximumRedeliveries(6);

       //重发时间间隔,默认为1秒

       redeliveryPolicy.setInitialRedeliveryDelay(1);

       //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value

       redeliveryPolicy.setBackOffMultiplier(1);

       //是否避免消息碰撞

       redeliveryPolicy.setUseCollisionAvoidance(false);

       //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效

       redeliveryPolicy.setMaximumRedeliveryDelay(1000);

       return redeliveryPolicy;

    }

   

    @Bean

    ActiveMQConnectionFactory activeMQConnectFactory(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy redeliveryPolicy){

       ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",url);

       activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

       return activeMQConnectionFactory;

    }

   

    @Bean

    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){

       JmsTemplate jmsTemplate = new JmsTemplate();

       jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化

       jmsTemplate.setConnectionFactory(activeMQConnectionFactory);

       jmsTemplate.setDefaultDestination(queue);//此处可不设置默认,在发送消息时也可设置队列

       jmsTemplate.setSessionAcknowledgeMode(1);//客户端签收模式

       return jmsTemplate;

    }

   

    @Bean

    public JmsTransactionManager jmsTransactionManager(ActiveMQConnectionFactory activeMQConnectionFactory){

       JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();

       jmsTransactionManager.setConnectionFactory(activeMQConnectionFactory);

       return jmsTransactionManager;

    }

   

    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂

    @Bean(name = "jmsQueueListener")

    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory, JmsTransactionManager jmsTransactionManager){

       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

       factory.setConnectionFactory(activeMQConnectionFactory);

       //设置连接数

       factory.setConcurrency("1-10");

       //重连间隔时间

       //factory.setRecoveryInterval(1000L);

       //factory.setSessionAcknowledgeMode(1);

       factory.setTransactionManager(jmsTransactionManager);

      

       return factory;

      

    }

}

创建生产者类

@Service("producer")

public class Producer {

    @Autowired

    private JmsMessagingTemplate jMessagingTemplate;

   

    public void sendMessage(Destination destination, final String message){

       jMessagingTemplate.convertAndSend(destination, message);

    }

}

创建消费者类

@Component

public class Consumer {

   

    @JmsListener(destination = "mytest.queue", containerFactory="jmsQueueListener")

    public void receiveQueue(TextMessage textMessage) throws JMSException{

       System.out.println("Consumer收到的报文为:"+textMessage.getText());

    }

}

测试

@Autowired

private Producer producer;

@Test

public void test01(){

    Destination destination = new ActiveMQQueue("mytest.queue");

    for(int i=0; i<10; i++){

       producer.sendMessage(destination, "my name laowang");

    }

}

原文地址:https://www.cnblogs.com/zhiboluo/p/10114763.html