activeMQ性能优化--对象池管理connection

activeMQ的某个应用场景,消费者和服务器只需建立一个长连接,而生产者的情况集中在服务器,需要对服务器端的生产者连接进行优化。

首先maven引入jar包依赖

[java] view plain copy
 
 
 在CODE上查看代码片派生到我的代码片
  1. <dependency>  
  2.        <groupId>org.activemq</groupId>  
  3.        <artifactId>activemq-all</artifactId>  
  4.        <version>5.9.0</version>  
  5.    </dependency>  
  6.    <dependency>  
  7.        <groupId>org.apache.activemq</groupId>  
  8.        <artifactId>activemq-pool</artifactId>  
  9.        <version>5.9.0</version>  
  10.        <exclusions>  
  11.            <exclusion>  
  12.                <groupId>org.apache.geronimo.specs</groupId>  
  13.                <artifactId>geronimo-jms_1.1_spec</artifactId>  
  14.            </exclusion>  
  15.        </exclusions>  
  16.    </dependency>  



下面是实现代码

[java] view plain copy
 
 
 在CODE上查看代码片派生到我的代码片
    1. import org.apache.activemq.ActiveMQConnectionFactory;  
    2. import org.apache.activemq.pool.PooledConnection;  
    3. import org.apache.activemq.pool.PooledConnectionFactory;  
    4. import org.slf4j.Logger;  
    5. import org.slf4j.LoggerFactory;  
    6.   
    7. import javax.jms.*;  
    8.   
    9. public class MQProductHelper {  
    10.   public static final Logger LOG = LoggerFactory.getLogger(MQProductHelper.class);  
    11.   private static PooledConnectionFactory poolFactory;  
    12.   
    13.   /** 
    14.    * 获取单例的PooledConnectionFactory 
    15.    *  @return 
    16.    */  
    17.   private static synchronized PooledConnectionFactory getPooledConnectionFactory() {  
    18.     LOG.info("getPooledConnectionFactory");  
    19.     if (poolFactory != null) return poolFactory;  
    20.     LOG.info("getPooledConnectionFactory create new");  
    21.     IConfigService configService = ServiceManager.getService(IConfigService.class);  
    22.     String userName = configService.getConfig("MQ_USER_NAME", ShopConstant.BC_SHOP_ID);  
    23.     String password = configService.getConfig("MQ_USER_PASS", ShopConstant.BC_SHOP_ID);  
    24.     String url = configService.getConfig("MQ_BROKER_URL", ShopConstant.BC_SHOP_ID);  
    25.     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);  
    26.     poolFactory = new PooledConnectionFactory(factory);        
    27.     // 池中借出的对象的最大数目  
    28.     poolFactory.setMaxConnections(100);  
    29.     poolFactory.setMaximumActiveSessionPerConnection(50);        
    30.     //后台对象清理时,休眠时间超过了3000毫秒的对象为过期  
    31.     poolFactory.setTimeBetweenExpirationCheckMillis(3000);  
    32.     LOG.info("getPooledConnectionFactory create success");  
    33.     return poolFactory;  
    34.   }  
    35.   
    36.   /** 
    37.    * 1.对象池管理connection和session,包括创建和关闭等 
    38.    * 2.PooledConnectionFactory缺省设置MaxIdle为1, 
    39.    *  官方解释Set max idle (not max active) since our connections always idle in the pool.   * 
    40.    *  @return   * @throws JMSException 
    41.    */  
    42.   public static Session createSession() throws JMSException {  
    43.     PooledConnectionFactory poolFactory = getPooledConnectionFactory();  
    44.     PooledConnection pooledConnection = (PooledConnection) poolFactory.createConnection();  
    45.     //false 参数表示 为非事务型消息,后面的参数表示消息的确认类型(见4.消息发出去后的确认模式)  
    46.     return pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    47.   }  
    48.   
    49.   public static void produce(String subject, String msg) {  
    50.     LOG.info("producer send msg: {} ", msg);  
    51.     if (StringUtil.isEmpty(msg)) {  
    52.       LOG.warn("发送消息不能为空。");  
    53.       return;  
    54.     }  
    55.     try {  
    56.       Session session = createSession();  
    57.       LOG.info("create session");  
    58.       TextMessage textMessage = session.createTextMessage(msg);  
    59.       Destination destination = session.createQueue(subject);  
    60.       MessageProducer producer = session.createProducer(destination);  
    61.       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
    62.       producer.send(textMessage);  
    63.       LOG.info("create session success");  
    64.     } catch (JMSException e) {  
    65.       LOG.error(e.getMessage(), e);  
    66.     }  
    67.   }  
    68.   
    69.   public static void main(String[] args) {  
    70.     MQProductHelper.produce("test.subject", "hello");  
    71.   }  
原文地址:https://www.cnblogs.com/telwanggs/p/13130453.html