ActiveMQ

1、启动:

进入bin目录下,使用命令 ./activemq start 

 测试是否启动成功:http://192.168.121.128:8161/admin   登录名为admin,密码admin

 2、端口说明:

816161616这两个端口说明:因为activemq是通过内置的jetty服务器来搭建网站,开放的网站访问端口就是8161,但是进入activemq这个软件后接收消息的端口是61616,所以得让Linux开放这两个端口,如果需要修改消息接收端口号,可以到activemq的解压文件夹下的config下的activemq.xml文件中修改transportConnectors节点下的name属性为openwire子节点端口

 3、ActiveMQ内部使用的是观察者模式,发送方发送消息以后,接收方一直密切关注,里面还用到了定时器任务,点对点的不需要自己注册观察者,框架自己完成了,而发布订阅模式,必须自己注册观察者。

点对点发送接收是按照队列名来匹配获取,所以发送和接收的队列名必须一致,不然接收不到,代码如下:

	/**
	 * 发送点对点消息
	 */
	@Test
	public void testQueueProducer() throws Exception {
		//1、创建一个连接工厂对象,需要指定服务的ip及端口。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.121.128:61616");
		//2、使用工厂对象创建一个Connection对象。
		Connection connection = connectionFactory.createConnection();
		//3、开启连接,调用Connection对象的start方法。
		connection.start();
		//4、创建一个Session对象。
		//第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
		//第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
		Queue queue = session.createQueue("test-queue830");
		//6、使用Session对象创建一个Producer对象。
		MessageProducer producer = session.createProducer(queue);
		//7、创建一个Message对象,可以使用TextMessage。
		/*TextMessage textMessage = new ActiveMQTextMessage();
		textMessage.setText("hello Activemq");*/
		TextMessage textMessage = session.createTextMessage("hello activemq");
		//8、发送消息
		producer.send(textMessage);
		//9、关闭资源
		producer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * 接收点对点消息
	 */
	@Test
	public void testQueueConsumer() throws Exception {
		//创建一个ConnectionFactory对象连接MQ服务器
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.121.128:61616");
		//创建一个连接对象
		Connection connection = connectionFactory.createConnection();
		//开启连接
		connection.start();
		//使用Connection对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//创建一个Destination对象。queue对象
		Queue queue = session.createQueue("test-queue830");
		//使用Session对象创建一个消费者对象。
		MessageConsumer consumer = session.createConsumer(queue);
		//接收消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				//打印结果
				TextMessage textMessage = (TextMessage) message;
				String text;
				try {
					text = textMessage.getText();
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
				
			}
		});
		//等待接收消息
		System.in.read();
		//关闭资源
		consumer.close();
		session.close();
		connection.close();
	}

  

原文地址:https://www.cnblogs.com/javabg/p/7451337.html