RocketMQ视频


1.在linux安装需要java环境
2.jps可以查看当前的java进程
3.启动broker会失败,因为默认的配置需要的内存会很大,需要修改runbroker.sh 和 runserver.sh的配置
4. 启动两个server(mqnamesrv和)的方式:nohup sh mqnamesrv &,nohup sh mqbroker -n localhost:9876 &
关闭两个server的方式:sh mqshutdown namesrv
查看日志使用:tail -f

集群方式:
1.服务器环境:两台或者是四台,角色都有nameserver,brokerserver,架构模式为主从第一个为master1,slave2
2.一般使用pot连接服务器
3.host添加信息:/ect/hosts
#nameserver
192.168.25.135 rocketmq-nameserver1
192.168.25.138 rocketmq-nameserver2
#brokerserver
192.168.25.135 rocketmq-master1
192.168.25.135 rocketmq-slave2
192.168.25.138 rocketmq-master2
192.168.25.138 rocketmq-slave1
4.重启网关
systemctl restart network
5.关闭防火墙:开放默认端口,安全性更高

(关闭防火墙的命令:systemmctl stop firewalld.service)

(查看防火墙的状态:systemmctl -cmd --state)

(禁止防火墙开机启动:systemmctl disable firewalld.service)

#开放默认nameserver默认端口
firewall-cmd --remove--port=9876/tcp --permanent
#开放默认master默认端口
firewall-cmd --remove--port=10911/tcp --permanent
#开放默认slave默认端口
firewall-cmd --remove--port=11011/tcp --permanent
#重启防火墙
firewall--cmd --reload
6.配置环境变量:不必进入目录去调用bin之类的
vim /ect/profile

#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all--.....
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH

然后wq!退出

之后使用source /etc/profile使配置立即生效


7.创建消息存储路径
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index

在rocketmq安装的文件中conf有可参考的配置文件
135上面的配置,138的配置和上述的一致


broker的配置文件:(NPPFTP)
brokerClusterName = rocketmq-cluster
brokerName=broker-a
brokerId=0
listenPort=10911
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq/store
storePathCommitLog=/usr/local/rocketmq/store/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
storePathIndex=/usr/local/rocketmq/store/index
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abortFile=/usr/local/rocketmq/store/abort
maxMessageSize=65536

#节点的角色
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH

slave的配置文件:
brokerClusterName = rocketmq-cluster
brokerName=broker-b
brokerId=1
listenPort=11011
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq/store
storePathCommitLog=/usr/local/rocketmq/store/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
storePathIndex=/usr/local/rocketmq/store/index
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abortFile=/usr/local/rocketmq/store/abort
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

然后在配置138上面的文件的时候,brokerName,brokerId和listenPort的不同


修改启动脚本文件:runbroker.sh和runserver.sh这两个文件
vim /usr/local/rocketmq/bin/runbroker.sh(或者是vi)
JAVA_OPT="${java_OPT} -server -Xms253m -Xmx256m -Xmn128m"

vim /usr/local/rocketmq/bin/runbroker.sh
JAVA_OPT="${java_OPT} -server -Xms253m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

启动集群
NameServer集群:
cd /usr/local/rocketmq/bin
nohup sh mqnamesrv &
Broker集群:
master1
cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties &
slave2
cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b-s.properties &
最后可以用jsp查看结果
tail -500f ~/logs/rocketmqlogs/namesrv.log
tail -500f ~/logs/rocketmqlogs/broker.log

mqadmin集群管理工具:使用繁琐
pwd查看当前的目录
./mqadmin {command} {args}

集群监控平台搭建:rocketmq-console
1.下载并打包:
1.git clone https://github.com/apache/rocketmq-externals
2.cd rocketmq-console
3.mvn clean package -Dmaven.test.skip=true
4.在打包前需要修改集群地址:rocketmq-config-namesrvAddr=...(在src/main/resources下面)
5.之后使用FileZilla或者xshell上传jar(在target中)
6.启动console:
java -jar ....jar
7.然后localhost:8080

消息发送样例(基本样式、顺序消息、延时消息、批量消息、过滤消息、事务消息):之前导入maven的依赖

发送者的步骤:

  1. 创建生产者并指定组名
  2. 指定nameserver地址
  3. 启动producer
  4. 创建消息对象,指定主题topic、tag和消息体
  5. 发送消息
  6. 关闭生产者

消费者的步骤:

  1. 创建消费者并指定组名
  2. 指定nameserver地址
  3. 订阅主题topic和tag
  4. 设置回调函数,处理消息
  5. 启动消费者


1.基本样例
1>消息发送:
同步消息: 是否有sendresult接收send的结果
异步消息:不同的不知道接受send后的结果
producer.send(msg, new SendCallback(){
onSuccess、onException
})

异步消息回调函数注意在shutdown之后,callback方法会出现异常
单向消息:(不关心发送结果)sendOneway方法
2>消费信息:

原文地址:https://www.cnblogs.com/cambra/p/13716266.html