ActiveMQ消息队列

1.四种消息队列:
1.1(Storm-)Kafka
1.2RabbitMQ安全性最高
1.3RocketMQ==>阿里
1.4ActiveMQ

2.为什么要使用消息队列?

主要解决系统之间的通信问题
2.1. 尽量消除系统耦合性

2. 2.异步消息传递(异步通信)

2.3. 流量削峰

3.JMS(Java Message Service)Java 消息服务 

3.1. JMS是Java EE的规范之一,定义了访问消息中间件的接口;

3.2. JMS规范指出消息传递应该是异步的、非阻塞的;

4.JMS的核心API
4.1. ConnectionFactory:连接工厂,用于创建Connection

4.2. Connection:客户端和MQ服务器的一次连接
4.3. Session:一次会话

4.4. Destination:生产者生产消息的目的地,消费者消费消息的来源
   Queue:只能消费一次
   Topic:可以消费多次

4.5. MessageProducer:消息生产者,用于将消息发送消息队列
4.6. MessageConsumer:消费者

4.7. Message:消息
   TextMessage
   MapMessage
   ObjectMessage
   BytesMessage
   StreamMessage
4.8. MessageListener

5.JMS消息类型 

5.1. 点对点消息(P2P): 一条消息只能被一个消费者消费,生产者和消费者没有时间上的依赖性
5.2. 发布订阅 :  一条消息可以被多个消费者消费
生产者和消费者有时间上的依赖性(生产者在生产消息的时候,至少应该有一个消费者处于在线状态)
可以创建一个持久化的消费者订阅队列

6.什么是ActiveMQ
1. ActiveMQ是最受欢迎的、功能强大的开源消息和集成服务器

2. ActiveMQ 速度快,支持跨语言的客户端和协议,很容易进行企业集成,支持许多高 级特性,完全支持JMS1.1和J2EE1.4

7.ActiveMQ的特点 

7.1. 支持多语言客户端和协议,如Java、C、C++、Ruby、Perl、Python、PHP

7.2. 支持许多高级特性,如消息分组、虚拟目的地、通配符、组合目的地;

7.3. 完全支持JMS1.1和J2EE1.4;

7.4. 可以很容易集成到Spring应用程序中;

7.5. 通过大部分J2EE服务器的测试,如TomEE、JBoss、WebLogic等;

7.6. 支持高效的JDBC持久化方式;

7.7. 集群的支持;

7.8. ......

8.上代码

8.1生产者和消费者(原生状态)

/**
 * 生产者
 */
public class HelloProducer {


    public static void main(String[] args) throws JMSException {
        // 1. 创建ConnectionFactory(用户名、密码、连接地址)
        // 集群的情况:“failover:(tcp://192.168.1.100:61616,tcp://192.168.1.101:61616,tcp://192.168.1.102:61616)?Randomize=false”
        ConnectionFactory factory = new ActiveMQConnectionFactory(null,
                null,
                "tcp://localhost:61616");
        //2.
        Connection connection = factory.createConnection();
        connection.start();

        //3.创建session
        //参数(是否开启事务,客户端自动签收消息)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建Destination(目的地)
        //参数:队列名称,如果不存在则创建一个新的队列
        Queue queue = session.createQueue("hello");

        //5.创建生产者
        MessageProducer producer = session.createProducer(queue);

        //6.创建消息,发送消息
        for(int i= 0;i<10;i++){
            TextMessage message = session.createTextMessage("这是第" + i + "条消息");
            producer.send(message);
        }
        producer.close();
        session.close();
        connection.close();
        System.out.println("发送完成");
    }
}
=============================================================================================
/**
* 消费者
*/
public class HelloConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");

Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue("hello");

MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage msg = (TextMessage)consumer.receive();
if(msg!=null){
System.out.println(msg.getText().toString());
Thread.sleep(1000);
}
}
}
}

8.2生产者和消费者(手动签收消息)

/**
 * 生产者
 */
public class SelectorProducer {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory(null,
                null,"tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();

        //客户端手动签收消息
        Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue("selector");

        MessageProducer producer = session.createProducer(queue);
        TextMessage msg = session.createTextMessage("地址2");
        msg.setIntProperty("age",12);
        msg.setStringProperty("name","et");

        TextMessage msg2 = session.createTextMessage("地址2");
        msg2.setIntProperty("age",2);
        msg2.setStringProperty("name","et");

        producer.send(msg);
        producer.send(msg2);
        producer.close();
        session.close();
        connection.close();
        System.out.println("发送成功");
    }
}
========================================================================================================
/**
* 消费者
*/
public class SelectConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");

Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue("selector");

MessageConsumer consumer = session.createConsumer(queue, "name = 'et' and age = 2");
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
//签收消息,通知队列删除消息
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}

8.3发布和订阅状态(及持久化订阅)

/**
 * 发布
 */
public class HelloPublisher {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory(null,
                null,
                "tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("topic");

        MessageProducer producer = session.createProducer(topic);

        MapMessage mapMessage = session.createMapMessage();
        mapMessage.setString("name","et2006");
        mapMessage.setInt("id",111111);
        producer.send(mapMessage);
        producer.close();
        session.close();
        connection.close();
        System.out.println("发送完成");
    }
}
========================================================================================
/**
* 订阅
*/
public class HelloSubscriber {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic("topic");

MessageConsumer consumer = session.createConsumer(topic);

consumer.setMessageListener(new HelloListener());
}
}
class HelloListener implements MessageListener{

@Override
public void onMessage(Message message) {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
int id = mapMessage.getInt("id");
String name = mapMessage.getString("name");
System.out.println(id + "-" + name);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
==========================================================================================
/**
* 持久化订阅者
*/
public class DurableSubscriber {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");

Connection connection = factory.createConnection();
connection.setClientID("zs");
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
MessageConsumer consumer = session.createDurableSubscriber(topic,"zs");
consumer.setMessageListener(message -> {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
int id = mapMessage.getInt("id");
String name = mapMessage.getString("name");
System.out.println(id + "-" + name);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}

 9.activemq整合spring

D:五月五月课件workActiveMQactivemq-springmvc

原文地址:https://www.cnblogs.com/liuqingzhong/p/14151446.html