之前使用的消息中间件是 ActiveMQ
,由于缺少维护和资料文档,出现问题不好排查,于是在此契机下,经过调研选择使用 RocketMQ
,下面来分享一下对于它的基础学习。
RocketMQ
一个纯 Java
、分布式、队列模型的开源消息中间件,前身是 MetaQ
,是阿里研发的一个队列模型的消息中间件,后开源给 apache
基金会成为了 apache
的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
借鉴于 Kafka
,对比两者,RocketMQ
偏向于稳定以及业务型操作。
一、整体架构
核心组件有四个 Nameserver、Broker、Producer、Consumer
上图各个模块都会互相建立长链接来进行联系,这种架构模式跟 Dubbo
有点像,都有一个注册中心来维护相关信息,不同点在于 Broker
这模块,在消息系统中起着重要的作用。
二、优点特性
稳定性、高性能和丰富的消息类型
网上已经有很多相关的优缺点介绍,这里推荐两篇:
三、简述各个名词概念
Topic
消息主题,是一种消息的逻辑分类,例如这类消息属于哪类操作。例如库存相关、订单相关、活动相关等。理解成一种抽象的分类规范,大家的操作按照 Topic 进行分类,不同的 Producer 将消息发送到指定的 Topic,不同的 Consumer 订阅指定的 Topic,从上面拉取消息消费,屏蔽了底层的消息存储。
Tag
对 Topic 进一步细化,在阿里云官方文档有这行注释 “Message Tag,可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 版的服务器过滤”
更多具体可以看这篇:Topic 与 Tag 最佳实践
Message
它是消息队列中 消息传递的载体。
发送消息是指发送到某个主题 Topic 下,其中每条消息包括以下几部分:
- Message ID
消息的全局唯一标识,由 RocketMQ
发送过程中自动生成,唯一标识某条信息
- Message key
消息的业务标识,由消息生产者 Producre
设置,唯一标识某个业务逻辑。
- Message Body
消息携带的内容体,一般在这里自定义传递的内容,记得要将消息内容序列化。
具体消息 Message 核心结构可以参考控制台里展示的 Message Detail
MessageQueue
前面提到的 Topic
是抽象概念,实际发送消息和消费消息的地方是 Message Queue
,每个 Topic
下可能有多个消息队列。引入队列的原因是为了提高可用性和灵活性,按照队列的性质 FIFO
,先发送的消息先消费。
例如默认情况下,一个 Topic
会分配四个 Message Queue
(参数配置:defaultTopicQueueNums),如果有两个 Broker
,那将会平分两个,如果是本地搭建一个 Broker
,那么应该跟我一样看到的是:一个 Broker 下有四个 Message Queue
消息发送的目的地和消费的获取源就是 Message Queue
Group
分组信息,一个组可以订阅多个 Topic
。
具体可以分为 Producer Group
和 Consumer Group
,一个应用里都可以建立多个发送者组和消费者组,不过推荐的用法是一个应用指定一个 Producer Group
,统一消息发送者的信息。
一般一个应用只需设定一个消费者组,单独订阅主题进行消费。如果一个 Topic
在一个应用中想设定两个处理逻辑,那么可以配置不同的消费者分组,可以实现对同一个主题消息设定不同的处理者 Handler
。
Offset
位移量,用来保存消息消费的进度。
从上面了解到,一个 Broker
下会有多个 Message Queue
,我们需要用一个下标来记录消息消费的位置。通过 Offset
可以定位到目前消费完成的消息位置,指示 Consumer
下一条要从 Offest
后面位置消费消息。
在代码中,Offset
是 long
基础类型,根据它来访问 Message Queue
指定位置的消息。
Order 消息有序性
一种按照顺序进行发布和消费的消息类型,分为 全局顺序消息和分区顺序消息。
全局顺序:这个比较好理解,对于同一个 Topic
的消息,无论消费者有多少个,消息出队只能一个一个按照顺序来,下一个 Message
的消费依赖于前一个消费完成。适用于性能要求不高的场景,不过基本挺少选择该模式。
分区顺序:通过某个 Sharding Key
来进行区块分区。同一个分区内的消息按照严格的 FIFO
顺序进行发布和消费。
这里是文档中举的例子:
电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
可以通过 Sharding Key
来保证同一类型、用户的消息顺序发送和消费,既保证了高并发处理消息,也保障了业务上的连贯性。
上图展示的是 分区顺序,从代码实现来说,需要设定自定义 Selector
,然后对入参 arg
进行解析和根据策略选择 mq
,例如常见的哈希取模,选择策略可以参考这个类 :SelectMessageQueueByHash
消息消费模式
在 RocketMQ
中,实现的消费模式有两种
- 拉(PULL)模式:
Consumer
主动从消息服务器Broker
获取消息。 - 推(PUSH)模式: 消息服务器
Broker
主动推送消息到Consumer
。
引用【藤原豆腐店-】的描述:
MQPullConsumer: 取消息的过程需要用户自己写,首先通过打算消费的
Topic
拿到MessageQueue
的集合,遍历MessageQueue
集合,然后针对每个MessageQueue
批量取消息,一次取完后,记录该队列下一次要取的开始offset
,直到取完了,再换另一个MessageQueue
。MQPushConsumer:
consumer
把轮询过程封装了,并注册MessageListener
监听器,取到消息后,唤醒MessageListener
的consumeMessage()
来消费,对用户而言,感觉消息是被推送(push
)过来的。
查看 SpringBoot
里面的 @RocketMQMessageListener
注解实现方式和团队使用二次封装的 RocketMQ
,发现使用的都是 MQPushConsumer
,封装好了 pull
轮询过程,所以可以认为,RocketMQ
使用的是 Pull
拉取模式的消费模式。
消息重复
消息的语义有三种:
- 最多一次(At most once)
- 最少一次(At least once)
- 精确一次(Exactly once)
由于网络波动原因,无法避免消息在网络传输时,发送端认为第一次发送失败后,进行发送重试,于是乎我们要解决的问题可以理解成:两条一样的消息,如何保证程序处理正确?
目前常见的做法有两种:
1、保证消息的 幂等性
2、消息系统 过滤重复的消息,或者 消费端 过滤重复的消息
幂等性: 利用数学上的概念加深理解,例如有个函数 f(x)
,x 是消息,那么无论对消息重复消费多少次,f(f(x))
的结果都是一样的。不会因为重复消费而产生副作用,保证数据的正确性。
消息去重: 这个比较好理解,每条消息带有全局唯一的 Message ID
,可以在消息系统 Broker
处进行过滤,也可以在消费者 Consumer
处进行过滤。
目前在介绍中,没看到 RocketMQ
在 Broker
处进行过滤去重,所以需要在消费者端进行过滤。可以考虑新增一张数据库表,记录处理过的 Message ID
,如果遇到重复的消息就不再进行处理,在处理中的消息可以先放入 Redis
,避免同时消费一样的消息~
四、详说核心模块
从前面的架构图中可以看到有四大核心模块,从生产者发送消息到消费者消费经历过以下的流程:
先以 Producer
角度来说,与其中一个 ns
建立长链接,然后定期发送心跳维持状态,获取 Topic
主题路由信息,然后与 Broker Master
建立长链接,定期发送心跳判断是否可用。根据发送消息的类型,判断是否需要 Broker
的返回值。
从 Consumer
角度来说,与 Producer
不同点在于,它可以从 Broker Master
订阅消息,也能够从 Broker Salve
订阅消息,这里不再重复画图。
由于 RocketMQ
是纯 Java
语言编写的,所以可以在 Github
中下载源码,查看每个模块的详细设计。
Nameserver
Nameserver
来管理消息订阅,消息发送和消费信息,集群中的各个服务需要通过 Nameserver
来了解各自的状态。
有点像 Dubbo
中的注册中心 Zookeeper
,NameServer
中维护着 Producer
集群、Broker
集群、 Consumer
集群的服务状态。通过定时(默认是 30s)发送心跳数据包进行维护更新各个服务的状态。
①、接收 Broker
的请求,注册 Broker
的路由信息
②、接口 Client
的请求,根据某个 Topic
获取其到 Broker
的路由信息
NameServer
没有状态,可以横向扩展。每个 Broker
在启动的时候会到NameServer
注册;Producer
在发送消息前会根据 Topic
到 NameServer
获取路由(到 Broker
)信息;Consumer
也会定时获取 Topic
路由信息。
关于 Namesrv
,了解上述概念后,可以在代码中查看具体的启动流程:
1 | org.apache.rocketmq.namesrv.NamesrvStartup#main |
Broker
Broker
的定位是消息代理存储服务器,职责是负责持久化消息还有管理消息消费的进度。
介绍一下它的特性:
① 与所有 Namesrv
节点保持长链接和心跳,定时(默认 30s)将 Topic
信息注册到 Namesrv
。
② 负责存储消息,以 Topic
为维度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
③ 具有上亿级消息堆积能力,同时可以严格保证消息的有序性。
具体启动代码入口在:
1 | org.apache.rocketmq.broker.BrokerStartup#main |
初始化过程中做的事情有点多,消息存储、远端服务、过滤器等等,粗看一眼有点晕,没有继续往下跟,感兴趣的可以去源码中瞅瞅=-=
Producer
消息发送者,往消息队列发送消息的主体角色。
下图是 RocketMQTemplate
调用 convertAndSend
方法调用的时序图:
核心步骤在 DefaultMQProducerImpl
的发送实现方法,底层选择要发往的 MessageQueue
,执行前置钩子、通过 NettyClinet
来发送请求,发送完成后执行后置钩子,最后返回 SendResult
。
具体可以在 SpringBoot
中引入 RocketMQ-Starter
依赖,然后发送消息,查看整体调用链路。
Consumer
- 识别依据
一个消费者分组中,可以设定很多个 Listener
,来分别消费不同 Topic
下的消息。
在消息消费时,需要通过 ConsumerGroup
+ Topic
+ Tag
来唯一确定 Listener
。
所以同一个消费组、同一个主题下,不可以出现相同 Tag
的 Listener
,应用在启动时会报错。
- 代码实现
实现代码与 Producer
在同一个模块:client
在讲消息消费之前,先来看下应用启动时,它将扫描打上 RocketMQMessageListener
注解的 beans
,然后进行注册容器
按照我的理解,在 for
循环中注册扫描到的 bean
,接着在 createRocketMQListenerContainer
组装 DefaultRocketMQListenerContainer
,然后在 Spring
容器里注册,等待之后的消费。
同时有一个守护线程在不断从 Broker
上拉取消息,监听到符合条件的消息后进行消费:
上面大致介绍的是应用启动时注册 Listener
和循环获取消息的过程,具体 Consumer
启动时的代码入口在这里:
1 | org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#start |
消费者启动注册的流程比较长,需要慢慢看~
五、具体使用(代码和监控 UI)
下载代码(external 和 二进制启动包)
external:扩展内容,为了获得监控台
Console
地址如下:1
https://github.com/apache/rocketmq-externals.git
install:应用启动包,用来部署
Namesrv
和Broker
1
https://rocketmq.apache.org/dowloading/releases/
程序实现代码:已经开源,贡献给
Github
,可以下载自行打包使用和参考实现思路,学习学习~1
https://github.com/apache/rocketmq
启动 nameserver 和 broker
在启动之前,请确保本地全局变量中包含 JAVA_HOME
变量,例如:
1 | echo ${JAVA_HOME} |
进入下载的 releases
二进制安装包目录
1 | cd rocketmq-all-4.7.0-bin-release/bin |
点击查看 play.sh
脚本,可以看到它将会启动 Namesrv
和 Broker
两个服务~启动好之后,就能够往 Broker
发送和消费消息
Springboot 集成 rocker-starter
这里使用的是 Springboot
集成 starter
模块,具体可以参考这篇文章:
https://www.baeldung.com/apache-rocketmq-spring-boot
个人整合后,放入了 Demo
的 Rocket
目录下:
https://github.com/Vip-Augus/springboot-note
这里要说下 Listener
消息消费者处理流程,应用不断往 broker
获取监听的 Topic
消息,然后找到对应的 Consumer
进行消费:
可以跟踪上图左侧的调用链路,了解消费者消费消息的整体链路。
启动监控 UI 使用
进入刚才下载的 external
路径,里面有个 rocketmq-console
目录,防止占用 8080 端口,需要修改一下,具体要修改的地方有两处:
1 | # UI 监控系统的访问端口 |
rocket-console
是一个 Springboot
项目,修改相关配置后,需要经过打包然后部署
1 | mvn clean install -DskipTests |
接着访问前面设定的端口,就能看到监控平台:
顶部的导航栏标识了它拥有的功能,例如想看某个 Topic
发往哪个 Broker
,然后被订阅的消费组有哪些,都可以通过监控来进行查看:
六、后续学习计划
日常使用中,基本可以不去修改 Nameserver
和 Broker
这两个模块的服务,更多关注的是 Producer
和 Consumer
的使用,同时可以对他们进行二次封装,替换其中的 DefaultMQProducer
和 MQConsumer
实现类,打造适合自身业务的发送者和消费者。
本次分享大介绍了 RocketMQ
的设计架构、核心模块的设计、源码位置和消息发送以及消费的大致流程,介绍了 RocketMQTemplate
基础使用还有监控 Console UI
的查看和使用,但关于位移量、消息存储格式、同步、异步刷盘方式和消息重复等等详细设计都还没有去了解,有机会的话之后再去了解和分享~