RocketMQ使用记录

---恢复内容开始---

he following softwares are assumed installed:

  1. 64bit OS, Linux/Unix/Mac is recommended;
  2. 64bit JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 4g+ free disk for Broker server

Download & Build from Release

Click here to download the 4.4.0 source release. Also you could download a binary release from here.

Now execute the following commands to unpack 4.4.0 source release and build the binary artifact.

  > unzip rocketmq-all-4.4.0-source-release.zip
  > cd rocketmq-all-4.4.0/
  > mvn -Prelease-all -DskipTests clean install -U
  > cd distribution/target/apache-rocketmq

Start Name Server

  > nohup sh bin/mqnamesrv &
  > tail -f ~/logs/rocketmqlogs/namesrv.log
  The Name Server boot success...

Start Broker

  > nohup sh bin/mqbroker -n localhost:9876 &
  > tail -f ~/logs/rocketmqlogs/broker.log 
  The broker[%s, 172.30.30.233:10911] boot success...

Send & Receive Messages

Before sending/receiving messages, we need to tell clients the location of name servers. RocketMQ provides multiple ways to achieve this. For simplicity, we use environment variable NAMESRV_ADDR

 > export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

Shutdown Servers

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

Updated:December 30, 2016

 


 

 

好久没有更新博客了,这次讲讲RocketMQ的部署和使用。
http://rocketmq.apache.org/
首先去下载源码,网站上有说明RocketMQ的部署环境:
1、64bit OS, Linux/Unix/Mac is recommended;
2、64bit JDK 1.8+;
3、Maven 3.2.x
4、Git
这里的Git可有可无(用于Git工具从GitHub下载RocketMQ源码),如果没有也可以直接去上面的地址下载RocketMQ源码。
下载完成以后,根据 http://rocketmq.apache.org/docs/quick-start/ 的步骤完成解压、编译、运行。
待到Name Server和Broker都跑起来,就可以运行Java代码测试功能了。

MQProducer

package MQProducer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args){
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        try {
            producer.start();

            Message msg = new Message("PushTopic",
                    "push",
                    "1",
                    "Just for test.".getBytes());

            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            msg = new Message("PushTopic",
                    "push",
                    "2",
                    "Just for test.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            msg = new Message("PullTopic",
                    "pull",
                    "1",
                    "Just for test.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            producer.shutdown();
        }
    }
}


MQConsumer

package MQConsumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args){
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("PushTopic", "push");
            //程序第一次启动从消息队列头取数据
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(
                    new MessageListenerConcurrently() {
                        public ConsumeConcurrentlyStatus consumeMessage(
                                List<MessageExt> list,
                                ConsumeConcurrentlyContext Context) {
                            Message msg = list.get(0);
                            System.out.println(msg.toString());
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
RocketMQ订阅是根据Topic的信息来的,所以这里只能收到PushTopic的消息,而收不到PullTopic的。

注意:下列jar包一个都不能少。

    fastjson-1.2.44.jar
    netty-all-4.1.19.Final.jar
    rocketmq-client-4.2.0.jar
    rocketmq-common-4.2.0.jar
    rocketmq-remoting-4.2.0.jar
    slf4j-api-1.7.25.jar
    slf4j-nop-1.7.25.jar

slf4j是RocketMQ记录日志的。fastjson也是必要的,如果没有这个jar包,可能会出现找不到路由的错误。

java -version

java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)

    1
    2
    3

mvn -v

Apache Maven 3.5.2

    1

RocketMQ version

4.2.0

    1

最后,开启RocketMQ,跑代码  
---------------------  
作者:Michael_Zhan_Tcys  
来源:CSDN  
原文:https://blog.csdn.net/u012138272/article/details/79050907  
版权声明:本文为博主原创文章,转载请附上博文链接!

 

 

 

 

---恢复内容结束---



使用rocketmq参考了下面这两篇博客

https://www.cnblogs.com/lizhangyong/p/8978855.html

https://blog.csdn.net/u013278314/article/details/82497132

感谢博主

原文地址:https://www.cnblogs.com/shiwanming/p/10413697.html