ActiveMQ与RabbitMQ安装以及实现

ActiveMQ

1.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ

解压缩apache-activemq-5.11.1-bin.zip,然后双击apache-activemq-5.11.1inactivemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

3.创建Eclipse项目并运行

创建project:MQ,并导入apache-activemq-5.11.1lib目录下需要用到的jar文件,项目结构如下图所示:

3.1.Sender.java

 1 package activeMQ;
 2 import javax.jms.Connection;
 3 import javax.jms.ConnectionFactory;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.Destination;
 6 import javax.jms.MessageProducer;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 
10 import org.apache.activemq.ActiveMQConnection;
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 public class Sender {
13     private static final int SEND_NUMBER = 10;
14 
15     public static void main(String[] args) {
16         // ConnectionFactory :连接工厂,JMS 用它创建连接
17         ConnectionFactory connectionFactory;
18         // Connection :JMS 客户端到JMS Provider 的连接
19         Connection connection = null;
20         // Session: 一个发送或接收消息的线程
21         Session session;
22         // Destination :消息的目的地;消息发送给谁.
23         Destination destination;
24         // MessageProducer:消息发送者
25         MessageProducer producer;
26         // TextMessage message;
27         // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
28         connectionFactory = new ActiveMQConnectionFactory(
29                 ActiveMQConnection.DEFAULT_USER,
30                 ActiveMQConnection.DEFAULT_PASSWORD,
31                 "tcp://localhost:61616");
32         try {
33             // 构造从工厂得到连接对象
34             connection = connectionFactory.createConnection();
35             // 启动
36             connection.start();
37             // 获取操作连接
38             session = connection.createSession(Boolean.TRUE,
39                     Session.AUTO_ACKNOWLEDGE);
40             // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
41             destination = session.createQueue("FirstQueue");
42             // 得到消息生成者【发送者】
43             producer = session.createProducer(destination);
44             // 设置不持久化,此处学习,实际根据项目决定
45             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
46             // 构造消息,此处写死,项目就是参数,或者方法获取
47             sendMessage(session, producer);
48             session.commit();
49         } catch (Exception e) {
50             e.printStackTrace();
51         } finally {
52             try {
53                 if (null != connection)
54                     connection.close();
55             } catch (Throwable ignore) {
56             }
57         }
58     }
59 
60     public static void sendMessage(Session session, MessageProducer producer)
61             throws Exception {
62         for (int i = 1; i <= SEND_NUMBER; i++) {
63             TextMessage message = session
64                     .createTextMessage("ActiveMq 发送的消息" + i);
65             // 发送消息到目的地方
66             System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
67             producer.send(message);
68         }
69     }
70 }
View Code

3.2.Receiver.java

 1 package activeMQ;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.MessageConsumer;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 
10 import org.apache.activemq.ActiveMQConnection;
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class Receiver {
14     public static void main(String[] args) {
15         // ConnectionFactory :连接工厂,JMS 用它创建连接
16         ConnectionFactory connectionFactory;
17         // Connection :JMS 客户端到JMS Provider 的连接
18         Connection connection = null;
19         // Session: 一个发送或接收消息的线程
20         Session session;
21         // Destination :消息的目的地;消息发送给谁.
22         Destination destination;
23         // 消费者,消息接收者
24         MessageConsumer consumer;
25         connectionFactory = new ActiveMQConnectionFactory(
26                 ActiveMQConnection.DEFAULT_USER,
27                 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
28         try {
29             // 构造从工厂得到连接对象
30             connection = connectionFactory.createConnection();
31             // 启动
32             connection.start();
33             // 获取操作连接
34             session = connection.createSession(Boolean.FALSE,
35                     Session.AUTO_ACKNOWLEDGE);
36             // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
37             destination = session.createQueue("FirstQueue");
38             consumer = session.createConsumer(destination);
39             while (true) {
40                 // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s
41                 TextMessage message = (TextMessage) consumer.receive(500000);
42                 if (null != message) {
43                     System.out.println("收到消息" + message.getText());
44                 } else {
45                     break;
46                 }
47             }
48         } catch (Exception e) {
49             e.printStackTrace();
50         } finally {
51             try {
52                 if (null != connection)
53                     connection.close();
54             } catch (Throwable ignore) {
55             }
56         }
57     }
58 }
View Code

4.注意事项

  1. 最后接收者跟发送者在不同的机器上测试
  2. 项目所引用的jar最后在ActiveMQ下的lib中找,这样不会出现版本冲突。

5.测试过程

发送消息:ActiveMq 发送的消息1
发送消息:ActiveMq 发送的消息2
发送消息:ActiveMq 发送的消息3
发送消息:ActiveMq 发送的消息4
发送消息:ActiveMq 发送的消息5

收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5

RabbitMQ

1.RabbitMQ环境安装

 

1)下载erlang支持包:http://www.erlang.org/download.html 如图所示:

 


 

2)下载rabbitmq server:http://www.rabbitmq.com/releases/rabbitmq-server/

 


 

下载RabbitMQ client:http://www.rabbitmq.com/releases/rabbitmq-java-client/或者在http://www.rabbitmq.com/java-client.html

 


 

注:由于RabbitMQ 支持erlang、Java和.NET你可以找到不同语言的client支持版本。

 

 

2.配置环境变量

1)、新建变量:ERLANG_HOME=D:Program Fileserl6.4,然后再path中添加%ERLANG_HOME%in;

 

2)、新建变量:RABBITMQ_SERVER=E: abbitmq_server-3.5.3,然后再path中添加%RABBITMQ_SERVER%sbin;

 

3)、运行sbin/rabbitmq-server.bat,启动RabbitMQ服务器

 


 

 RabbitMQ服务端是用AMPQ协议的, 而客户端支持多种语言(Java, .NET,Erlang......Maybe more........)。下面我们准备用java来写一个‘hello world’,测试RabbitMQ安装。 

 

3.RabbitMQ客户端测试

   3.1.Sender.java

 1 package rabbitMQ;
 2 
 3 import java.io.IOException;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 
 8 /**
 9  * 
10  * @author lvkun
11  * 
12  * @having-line---------------------------------------------------------
13  * @filename Send.java
14  * @function TODO
15  * @download <a href=
16  *           'http://www.rabbitmq.com/download.html'>http://www.rabbitmq.com/download.html</a
17  *           >
18  * @start-at 2015年5月24日,下午23:50:40
19  * @having-line---------------------------------------------------------
20  */
21 public class Send {
22     private final static String QUEUE_NAME = "hello";
23 
24     public static void main(String[] args) throws IOException {
25         ConnectionFactory factory = new ConnectionFactory();
26         factory.setHost("localhost");
27         Connection connection = factory.newConnection();
28         Channel channel = connection.createChannel();
29 
30         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
31         String message = "Hello World!";
32         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
33         System.out.println(" [x] Sent '" + message + "'");
34 
35         channel.close();
36         connection.close();
37     }
38 }
View Code

    3.2.Receiver.java

 

 1 package rabbitMQ;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 import com.rabbitmq.client.QueueingConsumer;
 7 
 8 /**
 9  * 
10  * @author lvkun
11  * 
12  * @having-line---------------------------------------------------------
13  * @filename Receive.java
14  * @function TODO
15  * @download <a href=
16  *           'http://www.rabbitmq.com/download.html'>http://www.rabbitmq.com/download.html</a
17  *           >
18  * @start-at 2015年5月24日,下午23:50:40
19  * @having-line---------------------------------------------------------
20  */
21 public class Receive {
22     private final static String QUEUE_NAME = "hello";
23 
24     public static void main(String[] argv) throws Exception {
25 
26         ConnectionFactory factory = new ConnectionFactory();
27         factory.setHost("localhost");
28         Connection connection = factory.newConnection();
29         Channel channel = connection.createChannel();
30 
31         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
32         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
33 
34         QueueingConsumer consumer = new QueueingConsumer(channel);
35         channel.basicConsume(QUEUE_NAME, true, consumer);
36 
37         while (true) {
38             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
39             String message = new String(delivery.getBody());
40             System.out.println(" [x] Received '" + message + "'");
41         }
42     }
43 }
View Code

启动客户端程序测试效果图:

原文地址:https://www.cnblogs.com/lvk618/p/4526331.html