rocketmq-分析:
注:当前下载版本为:rocketmq-all-4.3.0-bin-release (注意下载是二进制包)
核心目录分析:
bin: 启动、停止等脚本文件
log: 日志信息、
conf: 主要是broker 的核心配置文件
核心运行脚本:
runserver.sh / runserver.cmd (启动 namesrv 服务) 启动默认端口 9876 内存调整 -Xms256m -Xmx256m -Xmn512m
runbroker.sh / runbroker.cmd (启动 broker 服务) 启动默认端口 10911 内存调整 -Xms256m -Xmx256m -Xmn128m
windos: 下启动:
进入bin 目录下 执行 start mqnamesrv.cmd ---> 启动 runserver.sh
执行 start mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true ---> 启动 runbroker.sh 并允许broker 注册时自动创建topic 并注册
-n 指向 namesrv 服务
-c 执行配置文件
验证RocketMQ功能:
测试发消息 set NAMESRV_ADDR=127.0.0.1:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer 启动前需要绑定 namesrv
测试接消息 set NAMESRV_ADDR=127.0.0.1:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer 启动前需要绑定 namesrv
停止 mqshutdown.cmd namesrv mqshutdown.cmd broker
linux:
注: 由于默认端口 9876 但是目前的测试环境存在8976 ----------------> 所以此时需要修改namesrv 和 broker 的默认端口 9876->8876 10911->8911
下载地址: https://rocketmq.apache.org/docs/quick-start/
当前版本: rocketmq-all-4.9.3-bin-release
1.解压: unzip rocketmq-all-4.9.3-bin-release.zip 后删除zip 包
2.调整启动内存:
vim runserver.sh 内存调整 -Xms256m -Xmx256m -Xmn512m
vim runbroker.sh 内存调整 -Xms256m -Xmx256m -Xmn128m
3.在rocket的conf目录下添加namesrv.properties文件,文件中添加端口配置
listenPort=8876
./mqnamesrv -c /data/myt/rocketmq-test/rocketmq-4.9.3/conf/namesrv.properties 官网推荐 nohup sh bin/mqnamesrv -c /data/myt/rocketmq-test/rocketmq-4.9.3/conf/namesrv.properties &
./mqbroker -n 192.168.202.55:8876 官网推荐 nohup sh bin/mqbroker -n 192.168.202.55:8876 &
namesrv 和 broker 启动后 jps 查询即可
测试:
export NAMESRV_ADDR=192.168.202.55:8876
./tools.sh org.apache.rocketmq.example.quickstart.Producer
export NAMESRV_ADDR=192.168.202.55:8876
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
日志信息: 默认在当前用户的logs 目录下
tail -f /root/logs/rocketmqlogs/namesrv.log
tail -f /root/logs/rocketmqlogs/broker.log
停止服务 ./mqshutdown namesrv ./mqshutdown broker
Rocketmq 详解分析:
1. Rocketmq-服务相关配置搭建操作:
Rocketmq 高可用集群搭建:
注: 该组件本身对集群就有非常好的支持 namesrv 可以搭建集群(多个namesrv 之间互不通信但是储存了所有broker 集群的注册信息) 如果某个namesrv 宕机其他的可以起到高可用的作用
1.namesrv 提供注册中心的作用进行服务注册与发现 broker 集群服务注册topic 、地址等信息 当客户端收、发信息时候需要从 namesrv 获取路由信息
2.对broker 集群进行管理 对所有注册服务的broker 节点进行心跳检测,当长时间接收不到心跳时,就判断某个broker 节点已经宕机 需要进行故障转移
broker 是rocketmq 最核心组件 传递消息 注册消息的重要作用 高可用模式如下场景:
(1) 单master: broker-master
producer ---> broker-master ---> consumer 不可靠,该机器重启或者宕机,将要导致整个服务不可用
(2) 多master:
producer ---> broker-master-01 ---> consumer
broker-master-02 虽然某个服务宕机不会导致整个服务不可用,但是单台机器重启恢复的过程中未被消费的消息在机器恢复前不可订阅
(3)多master多slave 集群采用异步复制方式,主备有短暂消息延迟,毫秒级
write 异步
producer ---> broker-master01-----------broker-slave01 ---> consumer
broker-master02-----------broker-slave02
节消息先写入master, 然后被消费端读取 异步发送到slave点, 实时性高 Master宕机或磁盘损坏时会有少量消息丢失
主节点宕机后从节点可以起到让消费端正常订阅的作用(存在还未同步到slave 端的数据)
(4)多Master多Slave 集群采用同步双写方式,主备都写成功,向应用返回成功
write----------->------------->------
| write | write
producer ---> broker-master01--------------broker-slave01 ---> consumer
broker-master02--------------broker-slave02
服务可用性与数据可用性非常高,性能比异步集群略低,当前版本主宕备不能自动切换为主
2master-2slave 异步集群配置:
集群模式:
server1: server2:
namesrv-01 192.168.1.101:9876 namesrv-01 192.168.1.102:9876
broker-a 192.168.1.101:10911(注册192.168.1.101:9876、192.168.1.102:9876) broker-a-s 192.168.1.102:10912(注册192.168.1.101:9876、192.168.1.102:9876)
broker-b-s 192.168.1.101:10913(注册192.168.1.101:9876、192.168.1.102:9876) broker-b 192.168.1.102:10916(注册192.168.1.101:9876、192.168.1.102:9876)
4.3.2为例:
注:autoCreateTopicEnable=true,建议线下开启测试,线上关闭
修改内存大小
同一个机器上启动多个broker时,需使用不同的broker配置文件来启动实例,并分开端口
当一个节点启动多个broker实例时,存储路径如果显示的设置,则需要指定不同的storePath路径
配置文件分析:
brokerClusterName=rocketmq-cluster 集群名,一个集群的成员该名字相同
brokerName=broker-a broker 节点名 建议一主一从保持一致
brokerId=0 节点id 0 标识主节点 大于0为从节点
namesrvAddr=192.168.1.101:9876、192.168.1.102:9876 指向namesrv 服务地址 用于注册和心跳(指向所有的namesrv)
defaultTopicQueueNums=4 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
autoCreateTopicEnable=true 允许 Broker 自动创建Topic
autoCreateSubscriptionGroup=true 允许 Broker 自动创建订阅组
listenPort=10911 对外端口
deleteWhen=04 删除文件时间点,默认凌晨 4点
fileReservedTime=120 文件保留时间,默认 48 小时
storePathRootDir=/usr/local/rocketmq/store/broker-a 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog commitLog 存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue 消费队列存储路径存储路径
storePathIndex=/usr/local/rocketmq/store/broker-a/index 消息索引存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint checkpoint 文件存储路径
abortFile=/usr/local/rocketmq/store/abort abort 文件存储路径
brokerRole=ASYNC_MASTER 主节点可选择 同步主和异步主 从节点都是SLAVE
flushDiskType=ASYNC_FLUSH 刷盘方式 异步刷 生产者发送的数据暂存异步存到磁盘
1.定期刷盘
2.存到一定量再进行刷盘 容易丢失数据
同步刷 生产者每发送一条数据存盘成功后再通知
brokerIP1=192.168.1.101 #新增公网IP
启动 namesrv01 namesrv02 broker-a broker-a-s broker-b broker-b-s
可以指定各个节点的日志文件
nohup sh /jackxu/rocketmq/bin/mqbroker -c /jackxu/rocketmq/conf/2m-2s-async/broker-a.properties > /jackxu/rocketmq/logs/broker-a.log 2>&1 &
查询节点情况: ./mqadmin clusterlist -n 192.168.1.101:9876 展示当前namesrv 下注册的所有的broker 的信息
代码层面分析: 依赖的版本要与服务器安装的版本一致
rocketmq-client
producer ------> 10条消息 -------> broker-a 6 ---------------------------------> consumer 6
异步发送
===========> broker-a-s 6
broker-b 4 ---------------------------------> consumer 4
异步发送
===========> broker-b-s 4
注:默认在master 正常情况下生产端发送消息都是到达master,消费者获取消息也是默认从master,不会消费slave (broker 主从间异步传递)
此时将停掉broker-a,很可能存在broker-a 中未完全同步到 broker-a-s 的消息, broker-a-s 不会接收消息, 但是可以被消费 !!!
生产者再次发送一批数据发现都到了 broker-b 中 (broker-b ---> broker-b-s)
消息重试机制:
1.发送失败
设置发送的超时时间和重试次数
2.消费失败有两种情况
1.超时 ----------> 当发生超时时,消费端会一直重试,直至成功
2.mq 异常 ----------> 当发生mq异常时可以控制返回状态为CONSUME_SUCCESS 或 RECONSUME_LATER
当返回RECONSUME_LATER,broker会保留这条消息,而消费端会根据设置的次数进行重复消费,一般重试个3次就差不多了
有个重试策略,重试间隔时间越来越长: 10s、30s、1mins、2mins等,当超过次数后,需要应该手动管理这条消息了
两种状态:
CONSUME_SUCCESS
RECONSUME_LATER
消费模式:
1.广播消费: 多个消费者消费情况一致,每条消息每次可以被多个消费同时消费
consumer.setMessageModel(MessageModel.BROADCASTING);
2.集群模式: 每条消息每次只能被一个消费端消费到
consumer.setMessageModel(MessageModel.CLUSTERING);



