activeMQ

 

 

 

 

 

 

 

 

 

 

 

 

 

 

        

 

 

 

 

   

 

 

 解压到自己想放的目录

打开目录到bin文件夹下,根据电脑32位,还是64位打开文件夹

 方式一:运行activemq.bat,不要关闭黑框,关闭就关闭服务了

    访问localhost:8161,出现activeMQ的页面就成功了,可以进行登录,账号密码都为admin

方式二:以管理员的身份运行InstallService.bat,会在电脑上安装服务

    打开任务管理器,选择服务,找到activeMQ服务启动

    或者按win+R,输入services.msc,找到activeMQ启动服务

linux安装差不多也一样,下载对应安装包,解压

进入解压目录,进入斌文件夹,直接运行 ./activemq start

ps -ef | grep activemq 进行查看,也可以输入netstat -tunpl |grep 8161查看

./activemq stop进行停止服务

 

创建一个生产者: 

创建一个maven工程

导入依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.10</version>
    </dependency>

创建AppProducer类

/**   
* @Title: AppProducer.java 
* @Package com.yrg.queue 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月2日 下午12:48:08 
* @version V1.0   
*/
package com.yrg.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** 
* @ClassName: AppProducer 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月2日 下午12:48:08 
*  
*/
public class AppProducer {
    
    
    private static final String url="tcp://127.0.0.1:61616";
    private static final String queueName="queue-test";
    public static void main(String[] args) throws JMSException {
        
        //1.创建连接工厂ConnectionFactory
        ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory(url);
        
        //2.创建Connection
        Connection connection = connectionFactory.createConnection();
        
        //3.启动连接
        connection.start();
        
        //4.创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第一个参数:是否启动事务;第二个参数:应答模式(设置了自动应答)
        
        //5.创建一个目标
        Destination destination = session.createQueue(queueName);
        
        //6.创建一个生厂者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test:"+i);
            
            //8.发送消息
            producer.send(textMessage);
            
            System.out.println("发送消息:"+textMessage.getText());
        }
        
        //9.关闭连接
        connection.close();
    }
}

打开activeMQ服务,登录上去,访问网址

运行程序,进入网址中是否有消息

 可以看到有一天消息队列,可以点入查看详情

 

 创建一个消费者AppCUstom

/**   
* @Title: AppCustom.java 
* @Package com.yrg.queue 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月2日 下午2:41:16 
* @version V1.0   
*/
package com.yrg.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** 
* @ClassName: AppCustom 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月2日 下午2:41:16 
*  
*/
public class AppCustom {
    
    private static final String url="tcp://127.0.0.1:61616";
    private static final String queueName="queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        //1.创建连接工厂ConnectionFactory
        ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory(url);
        
        //2.创建Connection
        Connection connection = connectionFactory.createConnection();
        
        //3.启动连接
        connection.start();
        
        //4.创建会话 
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第一个参数:是否启动事务;第二个参数:应答模式(设置了自动应答)
        
        //5.创建一个目标
        Destination destination = session.createQueue(queueName);
        
        //6.创建一个消费者
        MessageConsumer consumer= session.createConsumer(destination);
        
        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接受失败");
                    e.printStackTrace();
                }
            }
        });
        
        //8.关闭连接
        //connection.close();//关闭连接会导致监听器退出,接受不到消息
    }
}

运行程序,可以看到100个消息被消费

 再运行一个消费者(在运行一次程序),可以看到没有接受到消息;

运行生产者,发送消息,观察两个消费者

        

 可以看到两个消费者平均消费

 主题模式:

创建Appproduce

/**   
* @Title: AppProducer.java 
* @Package com.yrg.queue 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月2日 下午12:48:08 
* @version V1.0   
*/
package com.yrg.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** 
* @ClassName: AppProducer 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月2日 下午12:48:08 
*  
*/
public class AppProducer {
    
    
    private static final String url="tcp://127.0.0.1:61616";
    private static final String topicName="topic-test";
    
    public static void main(String[] args) throws JMSException {
        
        //1.创建连接工厂ConnectionFactory
        ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory(url);
        
        //2.创建Connection
        Connection connection = connectionFactory.createConnection();
        
        //3.启动连接
        connection.start();
        
        //4.创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第一个参数:是否启动事务;第二个参数:应答模式(设置了自动应答)
        
        //5.创建一个目标
        Destination destination = session.createTopic(topicName);
        
        //6.创建一个生厂者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test:"+i);
            
            //8.发送消息
            producer.send(textMessage);
            
            System.out.println("发送消息:"+textMessage.getText());
        }
        
        //9.关闭连接
        connection.close();
    }
}

运行程序,可以在网址的topic中看到消息

 创建订阅者AppCustom

/**   
* @Title: AppCustom.java 
* @Package com.yrg.queue 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月2日 下午2:41:16 
* @version V1.0   
*/
package com.yrg.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** 
* @ClassName: AppCustom 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月2日 下午2:41:16 
*  
*/
public class AppCustom {
    
    private static final String url="tcp://127.0.0.1:61616";
    private static final String topicName="topic-test";
    
    public static void main(String[] args) throws JMSException {
        
        //1.创建连接工厂ConnectionFactory
        ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory(url);
        
        //2.创建Connection
        Connection connection = connectionFactory.createConnection();
        
        //3.启动连接
        connection.start();
        
        //4.创建会话 
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第一个参数:是否启动事务;第二个参数:应答模式(设置了自动应答)
        
        //5.创建一个目标
        Destination destination = session.createTopic(topicName);
        
        //6.创建一个消费者
        MessageConsumer consumer= session.createConsumer(destination);
        
        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接受失败");
                    e.printStackTrace();
                }
            }
        });
        
        //8.关闭连接
        //connection.close();//关闭连接会导致监听器退出,接受不到消息
    }
}

运行程序,发现接收不到消息

原因是因为主题模式要先订阅才能获得消息,而我们先是发布消息,才订阅

重新运行发布者,观察订阅者

在运行一个订阅者,重新发布消息,观察两个订阅者

   

 可以看到两个订阅者消息消费一样

Spring jms理论

  

 

 

 

 Spring jms演示

导入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.yrg</groupId>
  <artifactId>Jms-Spring</artifactId>
  <version>0.0.1-SNAPSHOT</version>
 
 <properties>
     <spring.version>4.2.5.RELEASE</spring.version>
 </properties>
 
  <dependencies>


  <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
  </dependency>
  <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring.version}</version>
  </dependency>
  
  <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
</dependency>

      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-test</artifactId>
          <version>${spring.version}</version>
      </dependency>
      
      <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
    <exclusions>
        <exclusion>
            <artifactId>spring-context</artifactId>
            <groupId>org.springframework</groupId>
        </exclusion>
    </exclusions>
</dependency>
      
  </dependencies>
</project>

创建配置文件 common.xml

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

    <context:annotation-config></context:annotation-config>

    <!-- ActiveMQ提供的ConnectionFactory -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616"></property>
    </bean>
    
    <!-- JMS提供的连接池 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
    </bean>

    <!-- 一个队列的目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue"></constructor-arg>
    </bean>
</beans>

创建priducer.xml

<!--引入common信息 -->
    <import resource="common.xml"/>
    
    <!-- 配置jmsTemplate用于发送消息-->
    <bean class="org.springframework.jms.core.JmsTemplate" id="jmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"></property>
    </bean>

创建接口IProducerServer

/**   
* @Title: IProducerServer.java 
* @Package com.yrg.jms.producer 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月3日 上午9:40:52 
* @version V1.0   
*/
package com.yrg.jms.producer;

/** 
* @ClassName: IProducerServer 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月3日 上午9:40:52 
*  
*/
public interface IProducerServer {
    
    void sendMessage(String message);
}

创建实现类ProducerServerImpl

/**   
* @Title: ProducerServerImpl.java 
* @Package com.yrg.jms.producer.ipml 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月3日 上午11:09:06 
* @version V1.0   
*/
package com.yrg.jms.producer.ipml;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import com.yrg.jms.producer.IProducerServer;

/** 
* @ClassName: ProducerServerImpl 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月3日 上午11:09:06 
*  
*/
public class ProducerServerImpl implements IProducerServer {

    @Autowired
    JmsTemplate jmsTemplate;
    
    @Resource(name="queueDestination")
    Destination destination ;
    
    
    public void sendMessage(final String message) {
        //使用jmsTemplate发送消息
        jmsTemplate.send(destination, new MessageCreator() {
            //创建一个消息
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
        System.out.println("发送消息:"+message);
            
    }

}

在producer.xml中添加

<bean class="com.yrg.jms.producer.ipml.ProducerServerImpl" id="producerServerImpl"></bean>

创建信息生产者AppProducer

/**   
* @Title: AppProducer.java 
* @Package com.yrg.jms.producer 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月3日 上午11:23:13 
* @version V1.0   
*/
package com.yrg.jms.producer;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.yrg.jms.producer.ipml.ProducerServerImpl;

/** 
* @ClassName: AppProducer 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月3日 上午11:23:13 
*  
*/
public class AppProducer {
    @SuppressWarnings("resource")
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:producer.xml");
        
        IProducerServer producerServer = context.getBean(ProducerServerImpl.class);
        for (int i = 0; i < 50; i++) {
            producerServer.sendMessage("test"+i);
        }
        context.close();    
    }
}

运行AppProducer,发送消息,可以到网址中查看消息

现在来创建消费者

首先创建消息监听器ConsumerMessageListener

/**   
* @Title: ConsumerMessageListener.java 
* @Package com.yrg.jms.consumer 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月3日 下午2:59:22 
* @version V1.0   
*/
package com.yrg.jms.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/** 
* @ClassName: ConsumerMessageListener 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月3日 下午2:59:22 
*  
*/
public class ConsumerMessageListener implements MessageListener{

    //监听到消息要做的事情
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接受消息:"+textMessage.getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

创建配置文件consumer.xml

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

        <import resource="common.xml"/>
    
    
        <!-- 配置消息监听器 -->
        <bean id="consumerMessageListener" class="com.yrg.jms.consumer.ConsumerMessageListener"></bean>
    
    <!-- 配置消息监听容器-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"></property>
            <property name="destination" ref="queueDestination"></property>
            <property name="messageListener" ref="consumerMessageListener"></property>
        </bean>
    
    
</beans>

创建Appconsumer消费者

/**   
* @Title: AppConsumer.java 
* @Package com.yrg.jms.consumer 
* @Description: TODO(用一句话描述该文件做什么) 
* @author yangrg  
* @date 2019年12月3日 下午2:19:20 
* @version V1.0   
*/
package com.yrg.jms.consumer;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/** 
* @ClassName: AppConsumer 
* @Description: TODO(这里用一句话描述这个类的作用) 
* @author yangrg 
* @date 2019年12月3日 下午2:19:20 
*  
*/
public class AppConsumer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:consumer.xml");
    }
}

运行两次程序,再发布消息

主题模式:修改三个地方就可以变成主题模式

1.在common.xml中添加

<!-- 一个主体目的地,发布订阅模式 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"></constructor-arg>
    </bean>

2.在ProducerServerImpl中将@Resource(name="queueDestination")改为@Resource(name="topicDestination")

3.在consummer.xml中消息监听容器改为以下

<!-- 配置消息监听容器-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"></property>
            <property name="destination" ref="topicDestination"></property> 
            <property name="messageListener" ref="consumerMessageListener"></property>
        </bean>

主题模式改好了,进行测试,主题要先订阅才能接受消息

 

 

 

 

 

 

 

 

 

 

 

 

 

 

实践:

将activeMQ复制到三个文件夹

cp -rf apache-activemq-5.15.10/ /usr/local/activeMQSum/activeMQ-a

cp -rf apache-activemq-5.15.10/ /usr/local/activeMQSum/activeMQ-b

cp -rf apache-activemq-5.15.10/ /usr/local/activeMQSum/activeMQ-c

 

创建一个共享文件夹

mkdir kahadb

 进入第一个目录

 cd activeMQ-a

 里面有个conf目录,进入

 cd conf/

 其中有个activemq.xml的配置文件,进行编辑 

  

vim activemq.xml

 找到网络连接配置

 将其他协议注释掉

 给A节点添加网络的配置项

 保存退出

还需要配置后端管理地址的jetty.xml服务器的端口

vim jetty.xml

 A节点的端口是8161,不需要修改直接退出

同样的配置B节点

 配置activemq

 还要在这里配置共享文件夹

找到如下地方

 改成共享文件夹的地方

 配置B节点的jetty

B节点配好,保存退出

C节点和B节点配置差不多,将activemq拷贝过去

 修改C节点的activemq

修改jetty

 C节点修改完成

依次启动activemq

./activeMQ-a/bin/activemq start

./activeMQ-b/bin/activemq start

./activeMQ-c/bin/activemq start

 输入 ps -ef | grep activemq 进行查看

 

 可以看到三个进程已经启动

依次输入

netstat -tunpl | grep 61616

netstat -tunpl | grep 61617

netstat -tunpl | grep 61618

查看端口服务

 可以看见C节点无服务

C节点处于Slave状态,所以无服务

进行测试将B节点杀掉

./activeMQ-b/bin/activemq stop

 查看服务

 可以看见C节点处于Master状态了

启动B节点

用之前写的项目Jms-test、来进行测试

将提供者Appproducer的url改为 

private static final String url="failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";

将AppConsumer的url改为

private static final String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";

启动生产者AppProducer,去网址查看消息发送到哪里

通过控制台可以看到消息发送到了61618的服务

将C节点关闭,看看消费者会从那个服务器消费消息

输入./activeMQ-c/bin/activemq stop 关闭

通过网址可以看见C节点的网址已经关闭

而B节点能够访问,且消息也在其中,当然能被消费者消费

运行消费者

可以在A节点网址看见消息被消费了

 并且连接的是B节点,点击Network

 在B节点网址也能看见消息被消费,且消费者是A节点

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

原文地址:https://www.cnblogs.com/godyrg/p/11980288.html