参数说明
参数 | 说明 |
---|---|
brokerClusterName=rocketmq-cluster-1 | 所属集群名字 |
brokerName=broker-a | broker名字,注意此处不同的配置文件填写的不一样 |
brokerId=0 | 0 表示Master, > 0 表示slave |
namesrvAddr=127.0.0.1:9876;127.0.0.2:9876 | nameServer 地址,分号分割 |
defaultTopicQueueNums=4 | 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数 |
autoCreateTopicEnable=false | 是否允许Broker 自动创建Topic,建议线下开启,线上关闭 |
autoCreateSubscriptionGroup=false | 是否允许Broker自动创建订阅组,建议线下开启,线上关闭 |
useEpollNativeSelector=true | |
listenPort=10923 | Broker 对外服务的监听端口 |
haListenPort=10924 | |
deleteWhen=04 | 删除时间,默认是凌晨四点 |
fileReservedTime=120 | 文件保留时间,默认48小时,单位是 hour |
mapedFileSizeCommitLog=1073741824 | commitLog每个文件的大小默认1G |
mapedFileSizeConsumeQueue=300000 | ConsumeQueue每个文件默认存30W条,根据业务情况调整 |
#redeleteHangedFileInterval=120000 | destroyMapedFileIntervalForcibly=120000 |
diskMaxUsedSpaceRatio=88 | 检测物理文件磁盘空间 |
storePathRootDir=/data/store | 存储路径 |
storePathCommitLog=/data/store/commitlog | commitLog存储路径 |
storePathConsumeQueue=/data/store/consumequeue | 消费队列存储路径 |
storePathIndex=/data/store/index | 消息索引存储路径 |
storeCheckpoint=/data/store/checkpoint | checkpoint 文件存储路径 |
abortFile=/data/store/abort | abort 文件存储路径 |
maxMessageSize=5242880 | 限制的消息大小,默认 1M |
# flushCommitLogLeastPages=4 | |
# flushConsumeQueueLeastPages=2 | |
# flushCommitLogThoroughInterval=10000 | |
# flushConsumeQueueThoroughInterval=60000 | |
brokerRole=SYNC_MASTER | Broker 的角色 - ASYNC_MASTER 异步复制Master - SYNC_MASTER 同步双写Master - SLAVE |
flushDiskType=ASYNC_MASTER | 刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘 |
#checkTransactionMessageEnable=false | |
sendMessageThreadPoolNums=128 | 发消息线程池数量 |
#pullMessageTreadPoolNums=128 | 拉消息线程池数量 |
useReentrantLockWhenPutMessage=false | |
waitTimeMillsInSendQueue=2500 | 刷盘等待时间,超时将会返回发送失败码给发送者 |
transferMsgByHeap=false | |
slaveReadEnable=true | 是否允许 slave 读 |
上面罗列的是常见的参数,例如 Broker
集群的名称,主从角色,消息存储路径、文件大小、清理时间等等
之前也遇到默认参数不适合使用的常见,例如遇到业务方瞬间发送大量消息, Broker
同步刷盘时间超过默认的 2.5s,导致其它业务方遇到发送消息失败的场景,于是在 Broker
能力完全充足的情景下,调整了 waitTimeMillsInSendQueue
到 5s,避免影响其它业务方使用
文件存储机制
重要说明:
想要深入了解 RocketMQ 消息存储的内幕,需要了解这两方面
- 文件存储的数据结构
- 灵活利用 Linux 的文件机制 mmap
这次学习记录,参考了 STAR 皆空 大神,这里记录的是『消息存储的数据结构』,关于 mmap 的内容,可以点击参考链接深入学习。
RocketMQ 有很多亮点,其中一个是选择直接使用操作系统来提升存储效率,写入二进制格式的文件,消息持久化过程最大化的转成顺序写,避免随机写的额外开销。
这里记录一下跟 Broker 消息存储相关的内容
- CommitLog(消息内容)
- ConsumeQueue(位点数据)
- Index(检索索引)
- Broker 接收到【发送消息】操作
- Broker 接收到【获取消息】操作
Broker 端整体架构
Broker
作为消息中转器,提供了消息发送、存储、查询,还有高可用的功能。
其中有几个重要模块:
- Remoing Module: 请求的入口,使用了 Netty 作为远程通讯工具,处理发送过来的请求。
- Client Manager: 管理客户端(发送者、消费者)并维护消费者的主题订阅。
- Store Service: 提供 API 来存储或查询物理磁盘上的消息。
- HA Service: 高可用服务,为主从节点之间提供数据同步功能。
- Index Service: 索引服务,可以根据特定 Key,建立消息索引,并提供快速消息查询功能。
消息物理存储结构
从前面表格中的参数 storePathXXXX
可以知道,文件存储相关位置在 /data/store
,使用 tree
命令查看这些消息文件的存储结构
1 | $tree commitlog/ consumequeue/ index/ |
CommitLog 消息数据文件
借鉴于 Kafka,RocketMQ 也是以 Topic 作为文件存储的基本单元,每个 Topic 都有其对应的数据文件和索引文件。
RocketMQ 与 Kafka 不同点在于,Kafka 将消息数据文件按 Topic 分开存储,如果存在大量 Topic 情况下,消息持久化会逐渐变成随机磁盘读写,消息中间件的高性能被磁盘IO 所限制;而 RocketMQ 对其进行改进,将全部 Topic 的数据文件写入同一个文件(commitLog)中,实现消息的顺序写。
单个 CommitLog 的大小为 1GB,每条消息及其元信息被顺序追加至文件,文件的尾部可能存在空闲区域。
除了记录消息本身的属性(消息长度、消息体、Topic 长度、Topic、消息属性长度、消息属性),CommitLog 同时记录了消息所在 ConsumeQueue 消费队列的信息(消费队列 ID 和偏移量)。
由于存储条目具备不定长的特性,当 CommitLog 剩余空间无法满足下一条消息的存储,会在当前 CommitLog 的尾部追加一个 MAGIC CODE
等于 BLANK_MAGIC_CODE
的存储条目作为结束标记,并开始下一个 CommitLog 文件的操作。
ConsumeQueue 消息队列文件
Topic 是个抽象概念,消息实际发往的是 consumeQueue 这个逻辑队列中,在 consumeQueue 中,记录了消息在 CommitLog 中的位置信息
单个 ConsumeQueue 文件大小为 6000000 Byte,存储 30W 条记录,每条记录固定 20B
与 CommitLog 不同,ConsumeQueue 的存储条目采用定长存储结构。
为了实现定长存储,ConsumeQueue 存储到了消息 Tag 的 HashCode,在 Broker 端进行消息过滤时,通过比较 Consumer 订阅 Tag 的 HashCode 和存储条目中的 Tag 的 HashCode 是否一致来决定是否消费消息。
Index 索引文件
在已有的 CommitLog 和 ConsumeQueue 基础上,已经满足一个消息中间件的消息发送和消费功能,RMQ 提供 Index 目的是为了检索消息更快,方便排查问题。
这个目录下的文件,提供了跟数据库索引一样的作用,「给定 Topic 和消息 Key,通过索引文件能快速找到消息」,提供了消息检索的作用,监控端的【消息查询】界面使用了这个功能。
单个 Index 文件大小等于 420000040 B,包含索引头(IndexHeader)、哈希槽(HashSlot)和消息索引(MessageIndex)
Index 存储条目的结构有点像 HashMap,使用链式地址法解决哈希冲突:
「每个 Hash Slot 关联一个 Message Index 链表,多个 Message Index 通过 preIndexOffset 连接。」
Broker 代码实现初探
Broker 启动阶段:
Broker Startup
org.apache.rocketmq.broker.BrokerStartup#main
1 | #org.apache.rocketmq.broker.BrokerStartup#main |
Broker 将消息处理器注册到核心控制器 BrokerController
,Broker
定义了很多种消息处理器,查看 AsyncNettyRequestProcessor
继承图:
- AdminBrokerProcessor
- ClientManageProcessor
- ConsumerManageProcessor
- EndTransactionProcessor
- ForwardRequestProcessor
- PullMessageProcessor
- QueryMessageProcessor
- ReplyMessageProcessor
- SendMessageProcessor
其中 SendMessageProcessor 负责处理【Producer 发送消息】的请求,PullMessageProcessor 负责处理【Consumer 消费消息】的请求。
SendMessageProcessor
处理器实现了 NettyRequestProcessor
接口,处理请求的 processRequest
方法,然后在 BrokerController
启动时,往 RemotingServer
中按照 RequestCode
来注册处理器。
Broker 接收到发送消息的请求
如果请求中的标志是 RequestCode.SEND_MESSAGE
,那么就会交给 SendMessageProcessor
进行处理,同时底层大量使用线程池技术。
例如 Debug Broker
端代码:
可以看到,处理的线程名字前缀是 SendMessageThread_
,异步回调处理请求。
Broker 接收到消费消息的请求
如果请求中的标志是 RequestCode.PULL_MESSAGE
,那么就会交给 PullMessageProcessor
来进行处理,代码细节可以后续跟进看看。
测试发送消息的接口为:org.springframework.messaging.core.AbstractMessageSendingTemplate#convertAndSend(D, java.lang.Object)
核心方法:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand, org.apache.rocketmq.remoting.netty.RemotingResponseCallback)
- 反序列化请求头 requestHeader
- 从消息上下文恢复 mqTrace 链路
- 判断是否批量发送
- 进入单条发送逻辑
- 预发送 preSend 校验
- 完善消息详情,用户参数等
- 消息存储 messageStore
进入消息存储:org.apache.rocketmq.store.MessageStore#asyncPutMessage
- 检查存储状态 checkStoreStatus,主要是确认服务状态,主从节点角色,是否可写入,还有页面缓存是否繁忙 pageCacheBusy
- 消息常规性校验,topic 长度,参数合法性
- 写入 commitLog 的方法
this.commitLog.asyncPutMessage(msg)
简单小结:
- 客户端在调用 API 发送消息时,构造
RemotingCommand
,头部信息设置RequestCode.SEND_MESSAGE_V2
,还有相关属性以及消息体。 - 消息中转器
Broker
,启动着NettyRemotingServer
,接收到请求 - 识别请求头的
RequestCode.SEND_MESSAGE_V2
状态码,将请求交给对应的处理器SendMessageProcessor
- 从
Request
中恢复消息,校验消息合法性 - 消息存储,将消息详情写入到 commitLog 中
- 返回处理结果
putMessageResult
网络模型
Broker
的良好性能,有一半得归功于 Netty
这个优秀的通讯框架,扒了一下 Broker
上代码实现还有网上资料,记录一下使用到的 Netty
网络模型。
Netty 网络模型
各模块作用:
- eventLoopGroupBoss
作为 acceptor 负责接收客户端的连接请求
- eventLoopGroupSelector
负责 NIO 的读写操作
- NettyServerHandler
读取 IO 数据,并对消息头进行解析
- disatch
过程根据注册的消息 code 和 processsor 把不同的事件分发给不同的线程。
由 processTable 维护(类型为 HashMap)
线程池 & 请求码
Broker
中大量使用线程池技术,通过状态码对请求进行分类,将请求分发到不同的线程池,以此达到资源隔离的目的,每个线程池接收到请求,经过解码 decode
请求体和组装上下文 ctx
,接着交给相应的处理器 xxxProcessor
差异化处理网络请求。
RequestCode
位置在:
org.apache.rocketmq.common.protocol.RequestCode
broker
启动时,注册处理线程池的位置在:
org.apache.rocketmq.broker.BrokerController#registerProcessor
对照表
作用 | 线程池 | 处理器 | 请求码 |
---|---|---|---|
发送消息 | sendMessageExecutor | SendMessageProcessor | RequestCode.SEND_MESSAGE RequestCode.SEND_MESSAGE_V2 RequestCode.SEND_BATCH_MESSAGE RequestCode.CONSUMER_SEND_MSG_BACK |
拉取消息 | pullMessageExecutor | PullMessageProcessor | RequestCode.PULL_MESSAGE |
消息重试 | replyMessageExecutor | replyMessageProcessor | RequestCode.SEND_REPLY_MESSAGE RequestCode.SEND_REPLY_MESSAGE_V2 |
查询消息 | queryMessageExecutor | NettyRequestProcessor | RequestCode.QUERY_MESSAGE RequestCode.VIEW_MESSAGE_BY_ID |
客户端注册、心跳检测 | heartbeatExecutor、clientManageExecutor | ClientManageProcessor | RequestCode.HEART_BEAT RequestCode.UNREGISTER_CLIENT, RequestCode.CHECK_CLIENT_CONFIG |
消费端处理(例如位点更新) | consumerManageExecutor | ConsumerManageProcessor | RequestCode.GET_CONSUMER_LIST_BY_GROUPRequestCode.UPDATE_CONSUMER_OFFSET RequestCode.QUERY_CONSUMER_OFFSET** |
事务消息处理 | endTransactionExecutor | EndTransactionProcessor | RequestCode.END_TRANSACTION |
处理集群信息 | adminBrokerExecutor | AdminBrokerProcessor | Default,不属于前面的请求码都由它进行处理 |
小结
本次学习主要从以下三个方面去了解:
- Broker 启动参数说明
- 消息文件的存储机制
- 使用的 Netty 网络模型
从简单的使用到底层原理学习,逐渐剖开 MQ
框架的深层,慢慢将文件操作使用到的技术也文件存储机制了解,接着再去学习 Netty 网络模型,将网络通讯的基础也开始补起来,从一个点发散,将知识体系补全,还有很多需要去学习和了解的,后面继续补充吧~