消息发送与接收

一、步骤分析

  1. 消息发送者步骤分析
    1. 创建消息生产者producer,并指定生产者组名
    2. 指定NameServer地址
    3. 启动producer完成初始化
    4. 创建消息对象,指定主题Topic、Tag和消息体
    5. 发送消息
    6. 关闭生产者producer
  2. 消息消费者者步骤分析
    1. 创建消费者Consumer,指定消费者组名
    2. 指定NameServer地址
    3. 订阅主题Topic和Tag
    4. 设置回调函数,处理消息
    5. 启动消费者consumer

二、消息发送

  1. 同步消息 
    这种可靠性同步地发送方式使用比较广泛,比如重要的消息通知,短信通知。
// 创建消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("生产者组名");
// 指定NameServer地址
producer.setNamesrvAddr("localhost:8080");
// 启动Producer实例
producer.start();

// 创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Messge("Topic","Tag","Message body".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);

// 关闭生产者producer
producer.shutdown();
  1. 异步消息 
    通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker响应。
// 发送异步消息
producer.send(msg, new SendCallback(){
    onSuccess(SendResult sendResult){};
    onException(){Throwable e};
});
  1. 单向消息 
    主要用在不特别关心发送结果的场景,比如日志发送。
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);

三、消息接收

// 创建消费者Consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组名");
// 指定NameServer地址
consumer.setNamesrvAddr("192.168.22.222:8080;192.168.22.223:8080");
// 订阅主题Topic和Tag
consumer.subscribe("Topic", "Tag");
// 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently(){
    // msgs:接收消息内容 
    ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动消费者consumer
consumer.start();
  1. 负载均衡模式 
    多个消费者共同消费队列消息,每个消费者处理的消息不同。
// 负载均衡模式消费
consumer.setMessageModel(MessageModel.clustering);
  1. 广播模式 
    每个消费者消费的消息都是相同的
// 广播模式消费
consumer.setMessageModel(MessageModel.broadcasting);
原文地址:https://www.cnblogs.com/xhyouyou/p/12465534.html