消息中间件 JMS

一、什么是?

  JMS : Java Message Service(Java消息服务),Java平台中关于面向消息中间件的接口

  重点在于接口,接口就意味着与JDBC类似,仅仅有声明,没有实现,具体的实现交给厂商. 接口本身是一种与厂商无关的API.

  

  支持的消息类型

  TextMessage 字符串对象

  MapMessage 名值对

  ObjectMessage 一个序列化的java对象

  BytesMessage 字节的数据流

  StreamMessage 原始数据值的数据流

 

  作用

  在soa 分布式架构系统中 或者企业多个项目中 进行多个系统异步传输消息

  优点 使用消息服务器 当做大一点的队列使用 先进先出 处理高并发写入问题,似的=业务由串行改为并行 处理效率高

  缺点:实时性不高 因为数据发送给消息服务器 消息服务器不会立刻处理 队列是先进先出,如果数据量大需要排队等候

 

二、使用场景

每一种技术都有其产生的原因和解决的问题,消息服务的使用场景如下:

  a:大的队列使用 先进先出 来处理高并发问题

  b:业务系统的串行改为并行 处理效率高

  • 核心应用:
    • 解耦: 订单 --> 物流 (降低工程间的强依赖程度)
    • 异步: 用户注册 --> 发送邮件,初始化信息 (耗时长非必须的业务抽离,放在空闲的时候处理)
    • 流量削峰: 秒杀,日志处理 (针对突发情况的大访问量场景)
  • 跨平台, 多语言
  • 分布式事务, 最终一致性 (引入和分布式和微服务之后,事务的处理和数据的一致是比较难保证的一个方面)
  • RPC调用上下游对接,数据源变动通知下属

三、消息中间件常见概念与编程模型

常见概念

  • JMS提供者: 连接面向消息中间件的,也就是JMS的接口实现, RocketMQ, ActiveMQ等. 简单的来说就是JMS接口的实现厂商提供的实现, 中间件本身.
  • Broker服务器: 接收生产消息,提供给消费者消费的程序
  • JMS生产者(Message Producer): 生产消息的服务
  • JMS消费者(Message Consumer): 消费消息的服务
  • JMS消息: 数据对象, 传递的内容
  • JMS队列: 存储待消费信息的区域(可以简单的理解为数据结构中的队列,可以保证顺序,存储消息的介质)
  • JMS主题(Topic): 一种支持发送消息给多个订阅者的机制(中转,不同的生产者发送消息到主题,经过服务器分发到不同的订阅者)
  • JMS消息通常的两种类型: 点对点(Point-to-Point), 发布/订阅(Publish/Subscribe). 区别在于前者是多个消费者但只有一个消费者可以消费消息,后者是多个消费者都可以同时消费一个消息.

一个消息对应一个主题,一个主题下由多个queue. 要注意的是topic是一个逻辑管理单位,queue是物理管理单位

基础编程模型

  • MQ中需要用到的一些类
    • ConnectionFactory: 连接工厂, JMS使用它来创建连接
    • Connection: JMS客户端到JMS Provider的连接
    • Session: 一个发送或接收消息的线程
    • Destination: 消息的目的地; 消息发送给谁(Broker)
    • MessageConsumer / MessageProducer: 消息消费者 / 消息生产者

 

 

四、同类技术对比

    • ActiveMQ: 
      • 支持多种语言的客户端和协议,基于JMS Provider的实现,容易上手. 但是吞吐量不高,多队列的时候性能会下降,存在消息丢失情况.
    • Kafka: 
      • 由Scala和Java编写,可以处理大规模的网站中的所有动作流数据(网页浏览,搜索等),保证数据尽量不丢失,适用于大数据领域(电商等). 但是不支持批量个广播消息,运维难度大,文档少,需要掌握Scala
    • RabbitMQ: 

    • 开源的AMQP实现,服务端用Erlang语言编写,支持多种客户端,在易用性,扩展性和高可用性等方面表现不错. 但是使用Erlang语言开发,阅读和修改源码难度大.(当然公司如果大部分人熟悉Erlang语言的话例外)安全性高
  • ZeroMQ: (最快)

五、小Demo

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.13.4</version>
    </dependency>
</dependencies>

  

点对点模式

接收方

package cn.liuhuan.core;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 接收方
 */
public class QueueConsuer {

    public static void main(String[] args) throws Exception {
        //1  创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2 获得链接
        Connection connection = connectionFactory.createConnection();
        //3 启动连接
        connection.start();
        //4 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5 创建队列
        Queue queue = session.createQueue("test01");
        //6 创建消息
        MessageConsumer consumer = session.createConsumer(queue);
        //7  监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8  等待键盘输入
        System.in.read();
        //9 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

}

发送方

package cn.liuhuan.core;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 发送方
 */
public class QueueProducer {
    public static void main(String[] args) throws Exception {
        //1  创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2 获得链接
        Connection connection = connectionFactory.createConnection();
        //3 启动连接
        connection.start();
        //4 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5 创建队列
        Queue queue = session.createQueue("test01");
        //6 创建消息
        MessageProducer producer = session.createProducer(queue);
        //7  生产消息
        TextMessage textMessage = session.createTextMessage("这是收到的第一个消息");
        //8  发送消息
        producer.send(textMessage);
        //9 关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

  

订阅发布模式

接收方(多个都是一样的)

package cn.liuhuan.core;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicConsuer1 {
    public static void main(String[] args) throws Exception {
        //1  创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2 获得链接
        Connection connection = connectionFactory.createConnection();
        //3 启动连接
        connection.start();
        //4 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5 创建队列
        Topic topic = session.createTopic("testTopic");
        //6 创建消息
        MessageConsumer consumer = session.createConsumer(topic);
        //7  监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8  等待键盘输入
        System.in.read();
        //9 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

  

发送方

package cn.liuhuan.core;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 发送方
 */
public class TopicProducer {
    public static void main(String[] args) throws Exception {
        //1  创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2 获得链接
        Connection connection = connectionFactory.createConnection();
        //3 启动连接
        connection.start();
        //4 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5 创建队列
        Topic topic = session.createTopic("testTopic");
        //6 创建消息
        MessageProducer producer = session.createProducer(topic);
        //7  生产消息
        TextMessage textMessage = session.createTextMessage("这是收到的第一个消息");
        //8  发送消息
        producer.send(textMessage);
        //9 关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

  

原文地址:https://www.cnblogs.com/shiliuhuanya/p/12123597.html