1.rocketmq概念模型
producer:消息生产者,负责产生消息,一般由业务系统负责产生消息
consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
push consumer:consumer的一种,需要向consumer对象注册监听
pull consumer:consumer的一种,需要主动请求broker拉取消息
producer group:生产者集合,一般用于发送一类消息
consumer group:消费者集合,一般用于接受一类消息进行消费
broker:mq消息服务(中转角色,用于消息存储与生产消费转发)
2.rocketmq源码包编译与结构说明
rocketmq-broker 主要的业务逻辑,消息收发,主从同步,pagecache rocketmq-client 客户端接口,比如生产者和消费者 rocketmq-example 示例,比如生产者和消费者 rocketmq-common 公用数据结构等等 rocketmq-distribution 编译模块,编译输出等 rocketmq-filter 进行broker过滤的不感兴趣的消息传输,减小带宽压力 rocketmq-logappender,rocketmq-logging日志相关 rocketmq-namesrv namesrv服务,用于服务协调 rocketmq-openmessaging 对外提供服务 rocketmq-remoting 远程调用接口,封装netty底层通信 rocketmq-srvutil 提供一些公用的工具方法,比如解析命令行参数
rocketmq-store 消息存储
rocketmq-test,rocketmq-example
rocketmq-tools 管理工具,比如有名的mqadmin
一.单机模式
1.主机名 192.168.11.81 rocketmq-nameserver1 192.168.11.81 rocketmq-master1 2.上传所需包 mkdir -p /usr/local/apache-rocketmq && tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq/ ln -s apache-rocketmq rocketmq 3.创建目录 mkdir -p /usr/local/rocketmq/store/commitlog mkdir -p /usr/local/rocketmq/store/consumequeue mkdir -p /usr/local/rocketmq/store/index 4.配置文件 cd /usr/local/rocketmq/conf/ 下面有三个目录 2m-2s-async 双主双从异步 2m-2s-sync 双主双从同步 2m-noslave 双主无从模式 cd 2m-2s-async vim broker-a.properties #所属集群 brokerClusterName=rocketmq-cluster #broker名字 brokerName=broker-a #0表示master,大于0表示slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876 #在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认是凌晨4点 deleteWhen=04 #文件保留时间,默认48小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq/store #commitLog存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消费队列存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #ASYNC_MASTER 异步复制Master #SYNC_MASTER 同步双写Master #SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #ASYNC_FLUSH 异步刷盘 #SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH checkTransactionMessageEnable=false #发消息线程池数量 sendMessageTreadPoolNums=128 #拉消息线程池数量 pullMessageTreadPoolNums=128 5.日志相关 mkdir -p /usr/local/rocketmq/logs cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml 6.启动命令 vim /usr/local/rocketmq/bin/runbroker.sh #测试环境把所需内存改成1G,线上需要内存较大,如果小于1g会起不来 JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g" vim /usr/local/rocketmq/bin/runserver.sh #同样修改-Xms1g -Xmx1g -Xmn1g 7.启动nameserver cd /usr/local/rocketmq/bin/ nohup sh mqnamesrv & 8.启动broker cd /usr/local/rocketmq/bin/ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 & 检查是否启动,如果有nameserver和brokerstartup说明启动成功 jps 启动后可查看broker日志和nameserver日志 9.关闭 cd /usr/local/rocketmq/bin/ 先关闭broker sh mqshutdown broker 再关闭namesrv sh mqshutdown namesrv jps检查是否关闭
二.rocketmq四种集群环境
1.单点模式
2.主从模式
3.双主模式
4.双主双从模式,多主多从模式
主从模式
主从模式环境构建可以保障消息的及时性和可靠性
投递一条消息后,关闭主节点
从节点继续可以提供消费者数据进行消费,但是不能接收消息
主节点上线后进行消费进度offset同步
部署
1.修改host,两台都改 192.168.11.81 rocketmq-nameserver1 192.168.11.81 rocketmq-master1 192.168.11.82 rocketmq-nameserver2 192.168.11.82 rocketmq-master-slave 2.上传所需包 81上操作 mkdir -p /usr/local/apache-rocketmq && tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq/ ln -s apache-rocketmq rocketmq 3.创建目录 mkdir -p /usr/local/rocketmq/store/commitlog mkdir -p /usr/local/rocketmq/store/consumequeue mkdir -p /usr/local/rocketmq/store/index 4.日志相关 mkdir -p /usr/local/rocketmq/logs cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml 5.修改内存 vim /usr/local/rocketmq/bin/runbroker.sh #测试环境把所需内存改成1G,线上需要内存较大,如果小于1g会起不来 JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g" vim /usr/local/rocketmq/bin/runserver.sh #同样修改-Xms1g -Xmx1g -Xmn1g 6.主从配置 cd /usr/local/rocketmq/conf/2m-2s-async vim broker-a.properties #修改 namesrvAddr=rocketmq-nameserver1:9876;namesrvAddr=rocketmq-nameserver2:9876 #主节点 brokerRole=ASYNC_MASTER vim broker-a-s.properties #修改 namesrvAddr=rocketmq-nameserver1:9876;namesrvAddr=rocketmq-nameserver2:9876 #从节点 brokerRole=SLAVE #从节点需要大于0即可 brokerId=2 7.82上操作 把81上/usr/local/apache-rocketmq拷贝到82 其中82上只用到broker-a-s.properties 8.启动nameserver,两台都操作 cd /usr/local/rocketmq/bin/ nohup sh mqnamesrv & 9.启动broker 81上操作 cd /usr/local/rocketmq/bin/ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 & 82上操作 cd /usr/local/rocketmq/bin/ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
即可