1、事务消息
主要分为两个流程:
1.1 正常消息的发送、提交
(1) producer 发送 Half 消息
(2) broker 本地写入 Half 消息(将 Topic 改成 RMQ_SYS_TRANS_HALF_TOPIC,该阶段 Consumer 由于没有订阅关系,无法消费)
(3) producer 根据 broker 写入消息结果,成功的话,执行本地事务;写入消息失败,producer 不执行本地事务
(4) 根据本地事务结果,往 broker 发送 Commit 或者 Rollback(如果是 Commit,将会将 Half 消息转回 Real Topic,生成消息索引,订阅者可以进行消费)
1.2 补偿流程:
(1) 半消息发送成功,但 broker 没收到 Commit 或 Rollback,进行状态回查(上图的第五步)
(2) Producer 收到回查消息,检查本地事务状态
(3) 根据本地事务状态,重新发送 Commit 或者 Rollback
补偿阶段用于解决消息 Commit 或者 Rollback 发送超时或者失败的情况
如果使用了事务消息,业务方需要实现该接口:
org.apache.rocketmq.client.producer.TransactionListener
1 | public interface TransactionListener { |
2、如何选择发送目的地(MessageQueue)
手动指定 MessageQueue 目的:分区有序性,实现顺序消费
例如 Topic 创建策略,默认分配单 Broker 上分配 8 个 MessageQueue。
Producer 在了解到要发往的 Topic 有 8 个消息队列,默认情况将会轮询发送,尽量让每个队列存储到的消息数量一致。
为了实现顺序消费,需要 Producer 在发送的时候,指定发送的目的地 - 特定的 MessageQueue,这时需要指定选择分区策略(MessageSelector)以及特定的分区键入参(arg)
可以查看具体的发送选择分区逻辑:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
1 | private SendResult sendSelectImpl(Message msg, MessageQueueSelector selector, Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) { |
待补充
分组的作用,例如消费者分组 consumerGroup 是为了消息消费的负载均衡,区分不同的消费者
生产者分组 producerGroup 的作用是为了 事务消息 的回查,根据分组进行二次确认,后续相关内容需要深入研究…