RocketMQ-消费消息
创始人
2024-06-03 17:21:46
0

源码版本号:版本号:4.9.4

消费方式

有两种消息方式

BROADCASTING: 广播模式,每条消息都会被消费者组内的所有消费者进行消费。

CLUSTERING: 集群模式,每条消息只会被消费者组内的一个消费者进行消费。默认是集群模式。

启动消费者

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupNameTest");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 多个tag之间用||分隔,* 代表所有consumer.subscribe("TopicTest001", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {System.out.println("消息数量:" + msgs.size());for (MessageExt msg : msgs) {System.out.println(msg.getTopic() + "|" + msg.getQueueId() + "|" + new String(msg.getBody()));}// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started.%n");}
}

第一步: 构建主题的订阅关系DefaultMQPushConsumerImpl#subscribe(String topic, String subExpression)
将需要订阅的主题信息存放到 RebalanceImpl 类中的 subscriptionInner 属性

第二步: 注册回调实现类,拉取到消息是会调用回调实现类对消息进行处理,处理完返回消费状态。如果消费失败,会走重试机制。

第三步: 启动消费者。消费者的启动流程跟生产者差不多。

启动消费者最终会调用 DefaultMQPushConsumerImpl#start 方法

  1. 做一些检查工作。 this.checkConfig()
  2. 如果消费模式为CLUSTERING, 则会订阅重试Topic: %RETRY% + consumerGroup
    消息消费失败就会被投递到这个Topic里面(会往这个重试Topic发送延时消息)。 this.copySubscription()
  3. 获取 MQClientInstance 实例。第590行
  4. 设置OffsetStore
    广播模式:LocalFileOffsetStore, 消费进度更新在本地文件。
    集群模式:RemoteBrokerOffsetStore, 消费进度更新到broker。
  5. 将消费者注册到 MQClientInstanceconsumerTable
  6. 启动 MQClientInstance
  7. 从NameServer中获取订阅的所有的Topic的详细信息。
    这样消费者就能知道每个Topic都有哪些队列,然后根据算法计算出自己应该消费哪几个队列。 this.updateTopicSubscribeInfoWhenSubscriptionChanged()
    最终调用的是 MQClientInstance.updateTopicRouteInfoFromNameServer(java.lang.String)方法。
  8. 发送心跳信息给broker,broker就能知道每个Topic都有哪些实例在消费。 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()

拉取消息

MQClientInstance 启动的时候会开启拉取消息的服务。在 MQClientInstance#start 方法中的
第240行会调用 PullMessageService#start 方法

// MQClientInstance 的第240行
this.pullMessageService.start();

PullMessageService继承了ServiceThreadServiceThread实现了Runnable接口,
PullMessageServiceMQClientInstance中的属性并跟随MQClientInstance启动。

查看它的run方法,不断地从任务队列中拿PullRequest出来,通过PullRequest里面的内容去拉取消息。

public class PullMessageService extends ServiceThread {/*** 拉取消息的任务* MQClientInstance启动时调用RebalanceService#start方法, 会往这个队列里面放任务* 后面再分析是如何往这个队列添加任务的*/private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue();/*** 找到90行*/@Overridepublic void run() {log.info(this.getServiceName() + " service started");// 这里是一个死循环, 不断地从队列里面拿拉取消息的任务while (!this.isStopped()) {try {// 如果没有拉取消息的任务, 则会阻塞. 处理完PullRequest后, 会再次放进去PullRequest pullRequest = this.pullRequestQueue.take();// 拉取消息, 见下面分析this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
}

看下PullRequest都有些啥

public class PullRequest {// 消费者组名称private String consumerGroup;// 队列信息[topic、brokerName、queueId], 知道拉取哪个队列的消息private MessageQueue messageQueue;/*** 处理消息的队列, 拉取回来的消息都会存储里面* 内部使用TreeMap保存未处理的消息, key 为 queueOffset*/private ProcessQueue processQueue;// 下一次拉取消息的偏移量private long nextOffset;
}

拿到PullRequest去broker拉取消息

public class PullMessageService extends ServiceThread {/*** 找到79行*/private void pullMessage(final PullRequest pullRequest) {/*** 通过 consumerGroup 找到对应的消费者* 从MQClientInstance的consumerTable属性中获取* 消费者启动时已经将自己注册到consumerTable这个Map中*/final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;// 最终调用消费者自己的方法来拉取消息impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
}

查看DefaultMQPushConsumerImpl#pullMessage方法

public class DefaultMQPushConsumerImpl implements MQConsumerInner {/*** 找到214行*/public void pullMessage(final PullRequest pullRequest) {/*** 从broker拉取回来的消息都是保存在ProcessQueue中*/final ProcessQueue processQueue = pullRequest.getProcessQueue();/*** 如果ProcessQueue已经被移除, 则不处理* 当消费者实例新增会减少时, 消费者消费的队列信息可能会有所变化* 比如有0 1 2 3四个队列, 刚开始只有一个消费者A, 消费者A要消费4个队列* 后来新增了一个消费者B(A和B是同一个消费者组), A分到的队列就变成0和1, B分到的队列就是2和3*/if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}// 设置最后拉取消息的时间戳pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());/*** 省略......做了一些校验工作* 从消息消费数量与消费间隔两个维度进行控制* 1.如果ProcessQueue当前未处理的消息条数超过了1000将触发流控,放弃本次拉取任务,* 将拉取任务延迟50毫秒再加入到任务队列中* 2.如果ProcessQueue当前未处理的消息大小超过100MB,放弃本次拉取任务,* 将拉取任务延迟50毫秒再加入到任务队列中* 3.非顺序消费,ProcessQueue中消息的最大偏移量与最小偏移量相差2000,放弃本次拉取任务,* 将拉取任务延迟50毫秒再加入到任务队列中*/// 309行 这里会设置拉取消息后的回调方法PullCallback pullCallback = new PullCallback() {// 省略, 见下面分析};/*** 省略......*/// 找到433行try {/*** 省略......* 这里会调用拉取消息的方法:PullAPIWrapper#pullKernelImpl* 消息拉取完成后将会调用回调方法*/} catch (Exception e) {log.error("pullKernelImpl exception", e);// 异常后,放弃本次拉取任务,将拉取任务延迟50毫秒再加入到任务队列中this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}
}

从broker拉取到消息就会调用回调方法: PullCallback#onSuccess 方法

public class DefaultMQPushConsumerImpl implements MQConsumerInner {/*** 找到214行*/public void pullMessage(final PullRequest pullRequest) {// 215行 从broker端拉取到消息后会存储到 ProcessQueuefinal ProcessQueue processQueue = pullRequest.getProcessQueue();// ....../*** 300行 拿到topic的订阅信息, 里面包含tag信息* 从broker拉取到消息后还会使用tag进行过滤*/final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());// ......// 309行PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {// 313行, 对拉取到的消息进行处理, 如:对tag进行过滤pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);// 316行switch (pullResult.getPullStatus()) {// 拉取到了消息case FOUND:// 省略......// 319行, 从拉取结果中取到下一次拉取消息的偏移量pullRequest.setNextOffset(pullResult.getNextBeginOffset());// 省略......// 325行, 如果拉取到的消息列表为空, 将拉取任务放入到队列中if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {// 省略......// 333行, 将拉取到消息放入到 ProcessQueueboolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 将拉取到的消息提交到ConsumeMessageService中供消费者消费, 见下面详细分析DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// 省略....../*** 最后将拉取任务放入到任务队列中* 这里会根据DefaultMQPushConsumer的pullInterval属性判断是否需要延时拉取* 如果 pullInterval > 0, 则会延迟 pullInterval 毫秒再放入任务队列*/}// 省略......break;// 省略default:break;}}}@Overridepublic void onException(Throwable e) {// 拉取异常, 延迟3s再将拉取任务放到队列中DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}};}
}

消费消息

从上面的代码可以知道,拉取到的消息会交给ConsumeMessageService#submitConsumeRequest方法进行处理

ConsumeMessageService有两个实现类ConsumeMessageOrderlyServiceConsumeMessageConcurrentlyService,前者是顺序消费,后者是并发消费

消费者在启动的时候,会根据注册的MessageListener类型进行选择,代码如下

public class DefaultMQPushConsumerImpl implements MQConsumerInner {// 575行public synchronized void start() throws MQClientException {// 619行if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}}
}

并发消费

以并发消费方式为例,代码如下所示。

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {// 找到192行@Overridepublic void submitConsumeRequest(final List msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {// 每次消费的消息个数, 默认为1final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {/*** 生成一个消费请求, ConsumeRequest实现了Runnable接口*/ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 提交消费请求this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {// 出现异常, 则会延迟一段时间再重新提交this.submitConsumeRequestLater(consumeRequest);}} else {/*** 省略......* 如果拉取到的消息条数大于每次消费者设置的每次消费的最大条数* 则会生成多个 ConsumeRequest*/}}
}

拉取到的消息列表被分批封装成ConsumeRequest提交到线程池中进行异步处理,没有先后消费的顺序。

看看ConsumeRequest内部是如何处理的

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {// 350行class ConsumeRequest implements Runnable {// 370行@Overridepublic void run() {/*** 这里会先判断当前消费者是否还能继续消费这个队列* 当新的负载均衡使得当前消费者不再消费这个队列, 那就直接不处理了*/if (this.processQueue.isDropped()) {return;}// 拿到注册的回调实现类MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;// 省略......// 395行ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {// 省略......这里会遍历消息列表, 为每个消息设置一个开始消费的时间戳// 402行 调用回调方法消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {// 省略......}// 省略......// 430行if (null == status) {// 如果 null == status, 则说明回调方法返回null或者发生了异常// 设置稍后消费的状态status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 省略......// 447if (!processQueue.isDropped()) {/*** 里面根据消费结果做一些处理:更新消费进度、消费失败会投递到重试主题*/ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}}
}

ConsumeMessageConcurrentlyService#processConsumeResult方法主要做了两件事:

  1. 如果消费失败,对于广播模式只是打印日志;
    对于集群模式,则会发送延时消息投递到重试主题%RETRY%+consumerGroup
    等时间到了,这个消息就会被再次消费了,消费者启动时会自动订阅这个Topic。
    延迟消息总共有 18 个等级,而消息重试使用了原延迟消息的第 3 - 18 等级。
    举个例子,对于首次重新消费的 Message 来说,它的 DelayLevel 会直接设置为 3,然后后续每次都会依次递增,达到了最大的重试次数之后就会被扔进死信队列当中。
  2. 更新消费进度offset。先将消费的消息列表从ProcessQueue.msgTreeMap中移除。
    如果msgTreeMap为空,offset=ProcessQueue.queueOffsetMax+1
    否则offset=ProcessQueue.msgTreeMap.firstKey()
    ProcessQueue.msgTreeMap保存的是拉取回来未被消费的消息,key为消息的offset

集群模式下,消息进度的更新是RemoteBrokerOffsetStore,内部有一个offsetTable记录了队列的消费进度,此时只是在内存中更新。
MQClientInstance启动的时候会调用MQClientInstance#startScheduledTask方法,开启一堆的定时任务,
默认每隔5s会将消费者的消费进度持久化到broker,具体方法在MQClientInstance.persistAllConsumerOffset
最终调用的是RemoteBrokerOffsetStore#persistAll方法。

顺序消费

保证同一个队列里面的消息能够按顺序进行消费。
拉取回来的消息提交给ConsumeMessageOrderlyService,会生成一个ConsumeRequest请求提交到线程池中。

public class ConsumeMessageOrderlyService implements ConsumeMessageService {// 408行 找到内部类 ConsumeRequestclass ConsumeRequest implements Runnable {private final ProcessQueue processQueue;private final MessageQueue messageQueue;public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {this.processQueue = processQueue;this.messageQueue = messageQueue;}@Overridepublic void run() {// 省略....../*** 432行* 拿到当前队列的加锁对象, 具体逻辑见里面的方法*/final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);// 加锁, synchronized (objLock) {// 省略....../*** 466行* 从processQueue中按顺序取出一批消息*/List msgs = this.processQueue.takeMessages(consumeBatchSize);/*** 省略......*/}}}
}

顺序消费方式,消费前都要对队列加锁,保证一个队列同一时间只能有一个消费线程消费。如果消费失败,则延迟一定时候后将
ConsumeRequest提交给ConsumeMessageOrderlyService
如果消费失败,当前队列停止消费,延迟一段时间再构造ConsumeRequest提交,
当消费次数达到DefaultMQPushConsumer.maxReconsumeTimes时才会投递到重试队列,

对消费状态的处理在这个方法里面ConsumeMessageOrderlyService.processConsumeResult

总结

消息消费的流程如下所示

  1. 设置需要订阅的主题信息。
  2. 为消费者设置回调方法,拉取到的消息会调用这个回调方法进行处理。
  3. 启动消费者。如果消费模式为CLUSTERING, 则会订阅重试Topic: %RETRY% + consumerGroup
    消息消费失败就会被投递到这个重试Topic里面(会往这个重试Topic发送延时消息)。
  4. 启动MQClientInstance实例。会调用PullMessageService#start方法开启拉取消息的任务。
  5. PullMessageService不断地从队列pullRequestQueue中获取PullRequest
    每一个PullRequest就是一个拉取消息的任务。
    PullRequest是通过RebalanceService#run方法触发生成的,
    MQClientInstance实例启动的时候会触发RebalanceService#start方法,具体实现后面再分析。
  6. PullMessageService拿到PullRequest后,通过consumerGroup找到对应的DefaultMQPushConsumerImpl并调用它的pullMessage(final PullRequest pullRequest)方法进行处理。
  7. 通过PullRequest中的MessageQueue[topic brokerName queueId]nextOffset去broker拉取消息
  8. 拉取回来的消息会先保存到PullRequest.processQueue中的msgTreeMap
  9. 异步消费拉取到的消息。将已经消费的消息从PullRequest.processQueue中的msgTreeMap移除。
    如果消息消费失败,即回调方法异常或者返回null,则会将该消息投递到重试队列 %RETRY% + consumerGroup
  10. PullRequest再次放到PullMessageService.pullRequestQueue队列中。

相关内容

热门资讯

育碧GDC2018程序化大世界... 1.传统手动绘制森林的问题 采用手动绘制的方法的话,每次迭代地形都要手动再绘制森林。这...
编译原理陈火旺版第三章课后题答... 下面答案仅供参考! 1.编写一个对于 Pascal 源程序的预处理程序。该程序的作用是...
MacBookPro M2芯片... MacBookPro M2芯片下如何搭建React-Native环境目录软件下载环境配置 目录 写在...
Android studio ... 解决 Android studio 出现“The emulator process for AVD ...
pyflink学习笔记(六):... 在pyflink学习笔记(一)中简单介绍了table-sql的窗口函数,下面简单介绍下...
创建deployment 创建deployment服务编排-DeploymentDeployment工作负载均衡器介绍Depl...
gma 1.1.4 (2023... 新增   1、地图工具    a. 增加【GetWorldDEMDataSet】。提供了一套 GEO...
AI专业教您保姆级在暗影精灵8... 目录 一、Stable Diffusion介绍    二、Stable Diffusion环境搭建 ...
vue笔记 第一个Vue应用 Document{{content}}{{...
Unity自带类 --- Ti... 1.在Unity中,自己写的类(脚本)的名字不能与Unit...
托福口语21天——day5 发... 目录 一、连读纠音 二、语料输入+造句输出 三、真题 一、连读纠音 英语中的连读方式有好几种...
五、排序与分页 一、排序 1、语法 ORDER BY 字段 ASC | DESC ASC(ascen...
Linux系统中如何安装软件 文章目录一、rpm包安装方式步骤:二、deb包安装方式步骤:三、tar....
开荒手册4——Related ... 0 写在前面 最早读文献的时候,每每看到related work部分都会选择性的忽略&...
实验01:吃鸡蛋问题 1.实验目的: 通过实验理解算法的概念、算法的表示、算法的时间复杂度和空间复杂度分析&...
8个免费图片/照片压缩工具帮您... 继续查看一些最好的图像压缩工具,以提升用户体验和存储空间以及网站使用支持。 无数图像压...
Spring Cloud Al... 前言 本文小新为大家带来 Sentinel控制台规则配置 相关知识,具体内容包括流控...
多项目同时进行,如何做好进度管... 多项目同时进行,如何做好进度管理? 大多数时候,面对项目进...
ATTCK红队评估实战靶场(二... 前言 第二个靶机来喽,地址:vulunstack 环境配置 大喊一声我...
【MySQL基础】3—多表查询 ⭐⭐⭐⭐⭐⭐ Github主页👉https://github.com/A-BigTr...