五.使用场景案例

RockerMQ—console的使用

⦁    首页

image

⦁  Producer消息生产的topic查询:stat统计、route通道

之前我机器使用的是主机名:apple,没用ip

image

统计

image

通道

image

可通过RockerMQ—console管理界面查看broker配置信息

image

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
        consumer. setNamesrvAddr ("127.0.0.1:9876");
        consumer. setInstanceName("QuickStartConsumer");
        consumer. subscribe ("TopicModel", "*");

image

1、通过new DefaultMQPushConsumer("QuickStartConsumer");的实例查看消费记录
image

image

2、实例

消费者如何接受特定的消息:以下案例是描述MQ消费者,如果根据Tag在MQ完成消息过滤。Tag消息的标签,消息类型,用来区分某个MQ的topic消息分类,MQ允许消费者按照Tag进行消息过滤,确保用户消费者最终只消费他关心、特定的消息类型。

image


案例一、接收订单下所有的信息

    /**
     * 消费方式-1 消费者如需订阅某Topic下所有类型的消息,Tag用 * 符号表示:
     * @param args
     * @throws InterruptedException
     * @throws MQClientException
     */
    public static void main1(String [] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
        consumer. setNamesrvAddr ("127.0.0.1:9876");
        consumer. setInstanceName("QuickStartConsumer");
        consumer. subscribe ("TopicModel", "*");
        consumer. registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
                MessageExt msg = msgs.get (0);
        /** Consumer对象在使用之前必须要调用start初始化,初始化一次即可。*/
        consumer. start ();
        System.out.println("Consumer Started");
    }

案例二、只关注订单信息

    /**
     * 消费者如需订阅某Topic下特定类型的消息!
     * @param args
     * @throws InterruptedException
     * @throws MQClientException
     */
    public static void main3(String [] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
        consumer. setNamesrvAddr ("127.0.0.1:9876");
        consumer. setInstanceName("QuickStartConsumer");
        // 请明确标明Tag:只关注自己需要的!
        consumer. subscribe ("TopicModel", "Pay");
        consumer. registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
                MessageExt msg = msgs.get (0);
                /*** 对topic tag验证:只关注特定Pay*/
                if (msg. getTopic (). equals("TopicModel") && msg. getTags (). equals("Pay")) {
                    System.out.print("特定类型:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer. start ();
        System.out.println("Consumer Started");
    }

案例三、只接收订单、购物信息

     /**
     * 消费者如需订阅某Topic下多种"标签"类型的消息!
     * @param args
     * @throws InterruptedException
     * @throws MQClientException
     */
    public static void main (String [] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
        consumer. setNamesrvAddr ("127.0.0.1:9876");
        consumer. setInstanceName("QuickStartConsumer");
        // 请明确标明Tag:多个Tag之间用 || 分隔
        consumer. subscribe ("TopicModel", "Shoppong||Pay");
        consumer. registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
                MessageExt msg = msgs.get (0);
                /** 对topic tag验证,只关注自己的标签*/
                System.out.println(new String (msg. getBody ()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer. start ();
        System.out.println("Consumer Started");
    }

案例四、(错误示例)同一个消费者多次订阅某Topic下的不同Tag,后者会覆盖前者

    /**
     * 消费者如需订阅某Topic下多种"标签"类型的消息!
     * @param args
     * @throws InterruptedException
     * @throws MQClientException
     */
    public static void main (String [] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
        consumer. setNamesrvAddr ("127.0.0.1:9876");
        consumer. setInstanceName("QuickStartConsumer");
        // 请明确标明Tag:多个Tag之间用 || 分隔
说明描述:consumer. subscribe ("TopicModel", "Pay"
);
        错误:consumer. subscribe ("TopicModel", "Shoppong");
说明描述:consumer只能接收TopicModel到Shoppong购物信息,不能接收到Pay订单信息,后者会覆盖前者。
        consumer. registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
                MessageExt msg = msgs.get (0);
                System.out.println(new String (msg. getBody ()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer. start ();
        System.out.println("Consumer Started");
    }

⦁    一个生产者向消息队列中发送一条消息,有多个消息者,如何保证此消息只能被消费一次:消费者A提取了信息,消费者B则不能获取信息。

集群:在客户端设置consumer. setMessageModel (MessageModel.CLUSTERING)
测试使用“推送”消息DefaultMQPushConsumer,消息是被平均消费的,每个消费在启动时已经分配好读取的队列了,所以不会产生重复消费的,不存在a消费了,b还能消费,有多少个用户订阅某个topci的信息,消息生产者就是推送多少条!每个用户一条,平均分配到不同的队列,一旦被消费就没有了。
测试1:模拟单个:消息生产者发送一条消息,代码则修改消息生产为发送一条

消费者A

image

image

描述:B直接消费不了,获取不到队列

测试2:模拟多个:消息生产者发送4条消息

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;

/**
 * 消息产生者Producer
 * 
 * @author admin
 *
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        /**
         * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ProducerGroupName需要由应用来保证唯一<br>
         * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
         * 因为服务器会回查这个Group下的任意一个Producer
         */
        DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer");
        /**
         * 1、负责查找NameServer,多个NameServer地址用分号隔开
         */
        producer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * 2、客户端实例名称(这个实例包含网络连接、线程资源等)
         */
        producer.setInstanceName("QuickStartProducert");
        /**
         * 3、Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         * 注意:切记不可以在每次发送消息时,都调用start方法
         */
        producer.start();
        /**
         * 这里模拟了 4 次消息发送,一个Producer对象可以发送多个topic,多个tag的消息。
         * 发生消息时必须声明区分Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
         */
        for (int I = 1; I <=4; I++) {
            try {
                    Message msg = new Message("TopicModel",
                            "TagB", 
                            "OrderID002",
                            ("Message..." + I).getBytes()
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
        producer.shutdown();
    }
}

image

输出结果msg.getMsgId ()不一样,每个用户只消费一条,不可被重复消费。
广播:consumer. setMessageModel (MessageModel.BROADCASTING)
暂不推荐…

测试3: 获取对象的信息,并推送json格式的消息

public static void main (String [] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer. setNamesrvAddr ("127.0.0.1:9876");
        producer. setInstanceName("Producer");
        stuinfo o = new stuinfo("小明","15074937147");
        //1、获取对象集合
        List<stuinfo> list = new ArrayList<stuinfo>();
        list.add(o);
        //2、使用序列化
        JSONArray obj = (JSONArray) JSONArray.toJSON(list);
        JSONObject jsonObject = new JSONObject ();
        //3、转换json
        jsonObject.put ("key", obj);
        producer. start ();
        try {
            Message msg = new Message("JSONtopic”, (jsonObject.getString("key")).getBytes());
            SendResult sendResult = producer. send(msg);
            System.out.println(sendResult);
        } catch (RemotingException | MQBrokerException e) {
            e. printStackTrace ();
        }
        TimeUnit.MILLISECONDS. sleep (1000);
        /**
         * 调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         */
        producer. shutdown ();
        System.out.println("
producer========>运行...
");
    }
原文地址:https://www.cnblogs.com/xxt19970908/p/6717194.html