activeMq-1 快速入门

Activemq 是一款开源的消息中间件,适合中小型应用使用,遵循JMS规范。

具体介绍这里就不再阐述了,这里简单说下消息中间件的好处

 1请求结果异步处理

  客户端发送请求以后,服务器可以把相关数据放到消息中间件上,不一定马上处理

 2解耦

 client进程和server服务进程都不一定同时可用。

 3不仅支持11通信,也支持1对多通信

另外我想说的是,JMS只是定义了接口,并没有给出实现。以下是jms相关的术语介绍

ProviderMessageProvider):生产者 生产消息

ConsumerMessageConsumer):消费者 发送消息

PTPPoint To Point,点对点通信消息模型

Pub/SubPublish/Subscribe,发布订阅消息模型

Queue:消息队列,和PTP结合

Topic:消息主题,和Pub/Sub结合

ConnectionFactory:连接工厂,JMS用它创建Connnection

ConnnectionJMS ClientJMS Provider的连接,用于创建session

Destination:消息目的地,由Session创建

Session:会话,由Connection创建,发送、接受消息的一个线程

快速入门

  下载 http://activemq.apache.org/ 启动程序   

   windows系统 (根据你系统 bit不同 选择不同的启动程序

 

 启动完成以后,就可以访问控制台了,web 控制台可以通过

 http://localhost:8161  需要输入用户名以及密码 (默认是admin  admin

 当然这个是可以修改的,默认这个是运行在jetty里面的

修改conf目录下的jetty-realm.properties

  

接下来,我们写第一个快速入门程序。具体来说,会有一个生产者用于发送消息,一个消费者用于接收消息。

引入jar包,我这里为了方便,直接引入的是activemq-all-5.9.0.jar

 

实际开发中 应该根据需求引入lib目录下的相关包

=============================================================================================================

生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
	public static void main(String[] args) throws Exception {
	
		ConnectionFactory factory = new ActiveMQConnectionFactory("cc","cz", ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
		Connection connection = factory.createConnection();
		// 默认是关闭的 需要开启
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination dest = session.createQueue("test");
		MessageProducer producer = session.createProducer(dest);
		// 设置非持久化  服务器关闭消息就不再保存
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		TextMessage textMessage = session.createTextMessage();
		textMessage.setText("I AM MESSAGE");
		producer.send(textMessage);
		try {
		} finally {
			if(connection!=null)
			connection.close();
		}
	}
}

消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
	public static void main(String[] args) throws Exception {

		ConnectionFactory factory = new ActiveMQConnectionFactory("cc", "cz",
				ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
		Connection connection = factory.createConnection();
		// Connection默认是关闭的 需要开启
		connection.start();
		// 设置是否支持事务 和 签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination dest = session.createQueue("test");
		MessageConsumer consumer = session.createConsumer(dest);
		while (true) {
			//因为这里我们知道发送的是TextMessage 所以没有做类型判断 类型判断可以用 instanceof
			TextMessage mes = (TextMessage) consumer.receive();
			// mes.acknowledge(); 签收消息
			System.out.println(mes.getText());
		}

	
	}

}

  

运行生产者 ,在运行消费者(先后顺序是没有关系的,我这里并没有让消费者停止运行)

  

详细api介绍

 1安全认证,我们不能让谁都去链接我们的mq服务器,这里我们可以在activemq.xml配置用户名密码 认证

 

 2conection一定要关闭,不然mq不会释放资源。

 3我们通过connection创建session对象的时候,有两个参数

分别是指 是否支持事务以及消息的签收模式

 签收模式有三种

 Session.AUTO_ACKNOWLEDGE 自动签收 当客户端从recive方法或者onMessage成功返回时 就签收

 Session.CLIENT_ACKNOWLEDGE 需要调用Message调用acknowledge消息才会签收【实际开发中,我们一般使用这种方式】

 Session.DUPS_OK_ACKNOWLEDGE 不必确认对消息的签收,可能会有消息的重复,但是提高了性能。

4 Producersend方法

上面的代码中,我们先后指定了目的地,持久化方式,实际上这些都可以不必指定的,而是到send的时候指定。而且在实际业务开发中,往往根据各种判断,来决定将这条消息发往哪个Queue,因此往往不会在MessageProducer创建的时候指定Destination。

TTL,消息的存活时间,一句话:生产者生产了消息,如果消费者不来消费,那么这条消息保持多久的有效期

priority,消息优先级,0-9。0-4是普通消息,5-9是加急消息,消息默认级别是4。注意,消息优先级只是一个理论上的概念,也就是说ActiveMQ并不能保证消费的顺序性!

 

deliveryMode,如果不指定,默认是持久化的消息。如果可以容忍消息的丢失,那么采用非持久化的方式,将会改善性能、减少存储的开销

当然我们也可以修改消息持久化方式,默认是kahadb

 

可以修改为mysql ,不过一般不推荐mysql 。

可以使用leveldb,毕竟这些数据库的性能要较MySQL更高些,事实上我们并不关心消息的“可视化”,更加关心的是消息在持久化的同时更加高效!

 

原文地址:https://www.cnblogs.com/javabigdata/p/7464393.html