JMS学习四(ActiveMQ消息过滤)

一、消息的选择器

不管是在消息发送端设置消息过期时间还是在接收端设置等待时间,都是对不满足的消息有过滤的作用,那消息选择器就是为过滤消息而生的下面来看看消息选择器:

ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。 消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为MessageConsumer 创建的一部分。

 消息选择器的用法
      MessageConsumer是一个Session创建的对象,用来从Destination接收消息


      关于消息选择器
      MessageConsumer createConsumer( Destination destination, String messageSelector )
      MessageConsumer createConsumer( Destination destination, String messageSelector, boolean noLocal )

      其中,messageSelector为消息选择器; 
      noLocal标志默认为false,当设置为true时,限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列。

      public final String SELECTOR="JMS_TYPE='MY_TAG1'" ; 
      选择器检查传入消息的JMS_TYPE的属性,并确定这个属性的值是否等于MY_TAG1;
      如果相等,消息报消费;如果不相等,那么消息就会被忽略;

1、消息生产者:

package mqtest3;  
  
import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.DeliveryMode;  
import javax.jms.Destination;  
import javax.jms.JMSException;  
import javax.jms.MapMessage;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Producer {  
    // 单例模式  
    // 1、连接工厂  
    private ConnectionFactory connectionFactory;  
    // 2、连接对象  
    private Connection connection;  
    // 3、Session对象  
    private Session session;  
    // 4、生产者  
    private MessageProducer messageProducer;  
  
    public Producer() {  
        try {  
            this.connectionFactory = new ActiveMQConnectionFactory("admin",  
                    "admin", "tcp://127.0.0.1:61616");  
            this.connection = connectionFactory.createConnection();  
            this.connection.start();  
            // 设置自动签收模式  
            this.session = this.connection.createSession(false,  
                    Session.AUTO_ACKNOWLEDGE);  
            this.messageProducer = this.session.createProducer(null);  
        } catch (JMSException e) {  
            throw new RuntimeException(e);  
        }  
  
    }  
  
    public Session getSession() {  
        return this.session;  
    }  
  
    public void send1(/* String QueueName, Message message */) {  
        try {  
  
            Destination destination = this.session.createQueue("first");  
            MapMessage msg1 = this.session.createMapMessage();  
            msg1.setString("name", "张三");  
            msg1.setInt("age", 20);  
            // 设置用于消息过滤器的条件  
            msg1.setStringProperty("name", "张三");  
            msg1.setIntProperty("age", 20);  
            msg1.setStringProperty("color", "bule");  
  
            MapMessage msg2 = this.session.createMapMessage();  
            msg2.setString("name", "李四");  
            msg2.setInt("age", 25);  
            // 设置用于消息过滤器的条件  
            msg2.setStringProperty("name", "李四");  
            msg2.setIntProperty("age", 25);  
            msg2.setStringProperty("color", "white");  
  
            MapMessage msg3 = this.session.createMapMessage();  
            msg3.setString("name", "赵六");  
            msg3.setInt("age", 30);  
            // 设置用于消息过滤器的条件  
            msg3.setStringProperty("name", "赵六");  
            msg3.setIntProperty("age", 30);  
            msg3.setStringProperty("color", "black");  
            // 发送消息  
            this.messageProducer.send(destination, msg1,  
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
            this.messageProducer.send(destination, msg2,  
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
            this.messageProducer.send(destination, msg3,  
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
        } catch (JMSException e) {  
            throw new RuntimeException(e);  
        }  
    }  
  
    public void send2() {  
        try {  
            Destination destination = this.session.createQueue("first");  
            TextMessage message = this.session.createTextMessage("我是一个字符串");  
            message.setIntProperty("age", 25);  
            // 发送消息  
            this.messageProducer.send(destination, message,  
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
        } catch (JMSException e) {  
            throw new RuntimeException(e);  
        }  
  
    }  
  
    public static void main(String[] args) {  
        Producer producer = new Producer();  
        producer.send1();  
        // producer.send2();  
  
    }  
}  

2、消息消费者:

package mqtest3;  
  
import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.JMSException;  
import javax.jms.MapMessage;  
import javax.jms.Message;  
import javax.jms.MessageConsumer;  
import javax.jms.MessageListener;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Conmuser {  
    // 单例模式  
    // 1、连接工厂  
    private ConnectionFactory connectionFactory;  
    // 2、连接对象  
    private Connection connection;  
    // 3、Session对象  
    private Session session;  
    // 4、生产者  
    private MessageConsumer messageConsumer;  
    // 5、目的地址  
    private Destination destination;  
    // 消息选择器  
    public final String SELECTOR_1 = "age > 25";  
    public final String SELECTOR_2 = " age > 20 and color='black'";  
  
    public Conmuser() {  
        try {  
            this.connectionFactory = new ActiveMQConnectionFactory("admin",  
                    "admin", "tcp://127.0.0.1:61616");  
            this.connection = connectionFactory.createConnection();  
            this.connection.start();  
            // 设置自动签收模式  
            this.session = this.connection.createSession(false,  
                    Session.AUTO_ACKNOWLEDGE);  
            this.destination = this.session.createQueue("first");  
            // 在构造消费者的时候,指定了 消息选择器  
            // 有选择性的消费消息  
            this.messageConsumer = this.session.createConsumer(destination,  
                    SELECTOR_1);  
        } catch (JMSException e) {  
            throw new RuntimeException(e);  
        }  
    }  
  
    public Session getSession() {  
        return this.session;  
    }  
  
    // 用于监听消息队列的消息  
    class MyLister implements MessageListener {  
  
        @Override  
        public void onMessage(Message message) {  
            try {  
                if (message instanceof TextMessage) {  
                    TextMessage ret = (TextMessage) message;  
                    System.out.println("results;" + ret.getText());  
                }  
                if (message instanceof MapMessage) {  
                    MapMessage ret = (MapMessage) message;  
                    System.out.println(ret.toString());  
                    System.out.println(ret.getString("name"));  
                    System.out.println(ret.getInt("age"));  
                }  
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
        }  
  
    }  
  
    // 用于异步监听消息  
    public void receiver() {  
        try {  
            this.messageConsumer.setMessageListener(new MyLister());  
        } catch (JMSException e) {  
            throw new RuntimeException(e);  
        }  
    }  
  
    public static void main(String[] args) {  
        Conmuser conmuser = new Conmuser();  
        conmuser.receiver();  
  
    }  
}  

上面的demo是对MapMessage和TextMessage两种消息的过滤条件的设置和消费,过滤条件的设置使在消息的属性中设置,而消费消息的时候直接是在session创建MessageConsumer时传入的参数即过滤条件(过滤条件的写法和SQL的写法是很像的)

在写过滤条件的时候要注意设置的是什么类型的条件即: int 、string 如果是int 则加引号而如果是String则要加哦!!!

需要注意的地方

     注意消息过滤器的过滤条件的设置

// 设置用于消息过滤器的条件  
msg3.setStringProperty("name", "赵六");  
msg3.setIntProperty("age", 30);  
msg3.setStringProperty("color", "black");  

消息过滤器的写法(类似于SQL语句的写法)

// 消息选择器  
public final String SELECTOR_1 = "age > 20";  
public final String SELECTOR_2 = " age > 20 and color='bule'";  
原文地址:https://www.cnblogs.com/alter888/p/8975356.html