activemq-5.13 在windows下部署应用

一、下载windows压缩包

    解压并双击F:serveractivemq-5.13.2inwin64activemq.bat 启动,32位的系统为F:serveractivemq-5.13.2inwin32activemq.bat;

二、配置访问权限

    编辑F:serveractivemq-5.13.2confactivemq.xml 文件,

    添加plugins节点:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

     <plugins>  
     <simpleAuthenticationPlugin>         
      <users>             
        <authenticationUser username="system" password="123" groups="users,admins"/>             
        <authenticationUser username="user" password="321" groups="users"/>
        <authenticationUser username="guest" password="111" groups="guests"/>
      </users>
     </simpleAuthenticationPlugin>
    </plugins>

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

 三、访问http://localhost:8161/控制台;

 四、示例代码:

package activemq;

import java.io.Serializable;

public class User implements Serializable {

    private static final long serialVersionUID = 1L;
    private int id;
    private String name;
    private String pass;
    
    public User(int id, String name, String pass){
        this.id = id;
        this.name = name;
        this.pass = pass;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPass() {
        return pass;
    }

    public void setPass(String pass) {
        this.pass = pass;
    }
    
    
}
package activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
 
public class JmsSender {

    public void send() {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://127.0.0.1:61616");

        try {
            Connection connection = connectionFactory.createConnection("user", "321");
            connection.start();

            Session session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("Test4.foo");

            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 0; i < 100; i++) {
                int id = i + 1;
                ObjectMessage message = session.createObjectMessage();
                message.setObject(new User(id, "李四" + id, "123456"));
                
                /**
                //文本消息     
                TextMessage textMessage = session.createTextMessage("文本消息");   

                //键值对消息     
                MapMessage mapMessage = session.createMapMessage();     
                mapMessage.setLong("age", new Long(32));     
                mapMessage.setDouble("sarray", new Double(5867.15));     
                mapMessage.setString("username", "键值对消息");   
                
                //流消息     
                StreamMessage streamMessage = session.createStreamMessage();     
                streamMessage.writeString("streamMessage流消息");     
                streamMessage.writeLong(55); 

                //字节消息     
                String s = "BytesMessage字节消息";     
                BytesMessage bytesMessage = session.createBytesMessage();     
                bytesMessage.writeBytes(s.getBytes());  
                **/
                
                producer.send(message);
            }
            session.commit();
            session.close();
            connection.close();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args){
        new JmsSender().send();
    }
}
package activemq;

import java.util.concurrent.TimeUnit;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsReceiver {

    public static void main(String[] args) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://127.0.0.1:61616");
        connectionFactory.setTrustAllPackages(true);

        try {
            Connection connection = connectionFactory.createConnection("guest",
                    "111");
            connection.start();

            final Session session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("Test4.foo");

            MessageConsumer consumer = session.createConsumer(destination);
            // listener 方式
            consumer.setMessageListener(new MessageListener() {

                public void onMessage(Message msg) {
                    ObjectMessage message = (ObjectMessage) msg;
                    // TODO something....
                    try {
                        User user = (User) message.getObject();
                        System.out.println("收到消息:" + user.getName());
                    } catch (JMSException e1) {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            });
            TimeUnit.MINUTES.sleep(1);

            session.close();
            connection.close();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void onMessage(Message m) {
        try {
            if (m instanceof TextMessage) { // 接收文本消息
                TextMessage message = (TextMessage) m;
                System.out.println(message.getText());
            } else if (m instanceof MapMessage) { // 接收键值对消息
                MapMessage message = (MapMessage) m;
                System.out.println(message.getLong("age"));
                System.out.println(message.getDouble("sarray"));
                System.out.println(message.getString("username"));
            } else if (m instanceof StreamMessage) { // 接收流消息
                StreamMessage message = (StreamMessage) m;
                System.out.println(message.readString());
                System.out.println(message.readLong());
            } else if (m instanceof BytesMessage) { // 接收字节消息
                byte[] b = new byte[1024];
                int len = -1;
                BytesMessage message = (BytesMessage) m;
                while ((len = message.readBytes(b)) != -1) {
                    System.out.println(new String(b, 0, len));
                }
            } else if (m instanceof ObjectMessage) { // 接收对象消息
                ObjectMessage message = (ObjectMessage) m;
                User user = (User) message.getObject();
            } else {
                System.out.println(m);
            }
 
        } catch (JMSException e) {
 
            e.printStackTrace();
        }
    }

}

    

原文地址:https://www.cnblogs.com/101key/p/5282939.html