RocketMQ Consumer 深入学习

学习一下火箭消息 - 消费者的原理和使用🚀

消费模式

消息消费有两种模式:

1、并发消费

并发消费是默认的处理方法,一个消费者使用线程池技术,可以并发消费多条消息,提升机器的资源利用率。默认配置是 20 个线程,所以一台机器默认情况下,同一瞬间可以消费 20 个消息。

其中 ConsumeMessageConcurrentlyService 的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;

this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}

2、顺序消费

有些业务场景,消息的消费需要顺序性,例如购物时,下订单、库存校验、支付、发送物流,虽然都属于「购物」这个场景的子任务,但他们之间是有顺序性的。如果它们业务处理通过消息解耦,那消息消费也得要有顺序性。

RocketMQ 的做法就是分区有序性,首先需要发送者,将有顺序的消息发往 Topic 下同一个 MessageQueue,然后消费者,顺序地一个一个进行消费,消费失败将会一直重试,前面消息消费完成才能进行下一个,所以需要在业务上确保消息失败机制,避免消息阻塞。


幂等消费

RocketMQ 的设计中,是不保证消息的幂等性,这时候需要业务方自行保证,重复消费消费不会对数据造成影响,从数学意义上来说,f(x) = f(f(x)),多次计算的结果都是一致的。

RocketMQ 保证存储在 Broker 的消息最少投递一次,该特性保证消息一定会被消费,但由于网络抖动或者其它场景,导致一条消息可能被消费多次。

在相同业务类型的消息中,这里需要考虑两个场景

  • 并发消费
  • 消息消费超时后重复投递

第一个场景很好理解,一条相同类型的消息被不同的消费者同时拉取,可能是不同发送者同时发送的,例如喜闻乐见的 A B 转账问题。

第二个场景比较难遇到,默认情况,消息处理超过 15 分钟后,将会重新投递消费,如果原来服务器 A 还在处理中,重新投递的消息被服务器 B 拉取了;另一种就是手动重发消息,通过控制台可以重新发送一模一样的消息,MessageID 和消息体跟之前一样,这两种情况下也会造成消息重复消费。

于是设计上,考虑了使用 Redis 做分布式锁,通过竞争锁来避免同时消息,以及用 Redis 暂存消费状态,设计如下:

注意点:

1、锁资源 key 的组装规则(【消费组】+【:】+【主题 topic】+【:】+【messageId 或者 messageKey】

2、锁对应的状态流转(Processing or Successed)

3、避免处理耗时超过锁 expire 时间,导致其它服务器订阅消息并成功消费。加入一个定时线程池,抢到锁资源后,组装定时任务,进行【续时】

4、任务成功后,修改状态为【Successed】,失效时间订为 1h;失败情况,清理掉所有锁资源和定时任务,返回失败重试策略

5、根据 Redis 中保存的状态,过滤重复的消息

在消息 SDK 代码实现上,通过装饰器模式,将 MessageConsumer 包装起来,在业务逻辑不需改动太大情况下,动态增加了幂等消费的功能。


负载均衡

上图展示了,在一个 pullRequestQueue,可能获取到多个消息 MessageExt,然后每个消息将会进入消费线程池中消费。

Consumer 端使用 RebalanceImpl 来实现负载均衡,所以想要理解拉取消息的流程,需要重点查看它实现。

Consumer 实例启动时,在工厂 MQClientInstance 中能够看到 new RebalanceService(this);,启动了一个后台线程,每隔 20s 进行重平衡操作 mqClientFactory.doRebalance()

同样按照消费者的消费模式,重平衡逻辑处理分成两个 switch 分支,接下来讨论的是『并发消费』逻辑

一、获取 MessageQueue 列表

RebalanceImpl 维护了一份 map 结构的本地缓存 topicSubscribeInfoTable,以 topic 维度保存了对应的 MesssageQueue 列表

1
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

二、获取在线的消费者终端列表

1
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

findConsumerIdList 方法接受两个参数:Topic 主题和 ConsumerGroup 消费组

底层通过发送 RequestCode.GET_CONSUMER_LIST_BY_GROUP 请求码的 RemotingCommandBroker 查询在线消费者列表,拿到结果后反序列化

三、分配 MessageQueue

根据分配策略,确定当前消费者实例要从哪些 MessageQueue 获取消息

1
2
3
4
5
List<MessageQueue> allocateResult = 
strategy.allocate(this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);

默认分配策略是平均分配,取当前下标 index,队列数取余机器数 mod,然后按照区间给当前应用分配。

例如有 8 个队列,2 台在线服务器,那平均消费 4 个队列,4 4 分配;3 台服务器的,按照 3 3 2 分配。

注意:

目前遇到很多业务团队,在开发过程中,使用了相同的分组名,但是订阅信息不一致,例如之前已经部署了两台应用,本期开发时,新增了 Topic 后,反馈有些消息无法消费,查看 Topic 消费情况表现如下:

根本原因就是前面说的 MessageQueue 平均分配后,之前的应用没有订阅新 Topic,于是这些消息状态一直处于 Not Consumed Yet解决方法就是统一订阅信息或者更换 ConsumerGroup 进行测试。

四、刷新本地缓存 & 构建请求列表

接下来,会根据前面分配的消息队列,来构建获取消息的请求 pullRequest 队列

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
....
}

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
}
}
}
this.dispatchPullRequest(pullRequestList);
return changed;
}

@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
}
}

public void executePullRequestImmediately(final PullRequest pullRequest) {
this.pullRequestQueue.put(pullRequest);
}

updateProcessQueueTableInRebalance 的作用:更新订阅关系

①、消费节点上下线
②、Topic 的队列分区参数调整

以上两种行为,将会影响到消息订阅的分配,所以需要客户端在消费消息前,先确定自己被分配到哪几个 MessageQueue,在构建 PullRequest 时,参数中带上监听的 queueId

最后,为过滤后的消息队列集合(mqSet)中的每个 MessageQueue 创建一个 ProcessQueue 对象,并存入 RebalanceImplprocessQueueTable 队列中。

接着构建 PullRequest,并调用 dispatchPullRequest 方法,将拉取消息的请求放入到 pullRequestQueue 队列中,等待后面的 PullMessageService 取出来调用。

五、后台线程不停从 Broker 拉取消息

后台线程是:PullMessageService

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
}

pullRequestQueue 请求队列,就是前面重平衡服务,构建好放入该队列中的,然后在 PullMessageService 中的 run 方法,使用 while 死循环,不停的去 Broker 请求新消息

六、消息消费

在获取消息时,会注册一个回调接口,具体入口在 MQConsumerInner,然后在 PullCallback 里调用 messageListener 进行消费,也就是我们写的业务处理逻辑。

在正常消费完成后,将 pullRequest 重新放回拉取消息的任务队列中,等待 PullMessageService 的下一次获取,拉取新消息。

正常消费,业务处理没有异常的话,将会返回 ConsumeReturnType.SUCCESS 表示成功确认,消费位点也能继续前进。

消费失败将会触发补偿机制

  • ConsumeMessageConcurrentlyService

并发模式下,它会将消息投递到 %retry 队列,更新当前位点,让后面的消息继续消费,如果该消息一直失败,默认最多重试 16 次就会丢到死信队列中。

  • ConsumeMessageOrderlyService

顺序模式需要注意下,出现失败它不会投递到重试队列,而是将一直在本地重试,直到消费成功为止,所以有可能出现某个 MessageQueue 消费卡住,并且后面消息都不能消费的场景,注意捕获业务处理异常。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
...
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
// 构建回调接口
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
...
// 这一步消息消费,进入设定的消费逻辑 messageListener
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
// 是否稍后才消费,重新扔回到请求队列中
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
break;
...
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
...
try {
// 从 Broker 端获取消息
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}

获取消息的这个方法中,有两个核心部分

①、构建消费回调函数
②、从 Broker 端获取新消息

回调接口中,设定了对新消息的处理逻辑,包括顺序消息的特殊处理,还有是否需要等待一段时间才消费,真正执行业务方设定的消费逻辑在 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest)DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest 中。

然后将回调函数作为参数,放入 this.pullAPIWrapper.pullKernelImpl 方法中,接收消息后,执行回调函数来处理消息。

到这一步为止,从消息获取到消息消费,执行本地业务逻辑的基本流程就基本了解清楚,后面的状态确认以及位点 offset 更新,感兴趣的可以再去跟踪一下。


小结

消费者的深入学习分成以下几部分

  • 消费模式
  • 幂等消费概念
  • 负载均衡

记录了并发模式和广播模式的区别,使用上需要注意的细节。

跟大家分享了一下在原生 RMQ 不支持幂等消费,同时不需要业务方做过多改造的情况下,通过封装 SDK,在里面实现幂等消费的方案。

最后梳理了一下消费者如何重平衡、构建拉取消息的请求最后消费消息的代码过程。

其中还有很多细节点还需要去了解,例如重平衡 doReblance 阶段,出现服务器上下线,还处于消费的 MessageQueue 如何处理(看了一下有加锁 lock 操作,避免两台服务器同时操作同一个队列)的代码如何实现等等

最后,MQ 的学习之旅,从点到面,还需要继续学习,之后再见👋~

PS:分类总结排版可以去看语雀笔记 https://www.yuque.com/books/share/2167ed0a-b9c9-4e1f-b648-1031b36cd144?# 《RocketMQ》