源码版本号:版本号:4.9.4
有两种消息方式
BROADCASTING: 广播模式,每条消息都会被消费者组内的所有消费者进行消费。
CLUSTERING: 集群模式,每条消息只会被消费者组内的一个消费者进行消费。默认是集群模式。
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupNameTest");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 多个tag之间用||分隔,* 代表所有consumer.subscribe("TopicTest001", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {System.out.println("消息数量:" + msgs.size());for (MessageExt msg : msgs) {System.out.println(msg.getTopic() + "|" + msg.getQueueId() + "|" + new String(msg.getBody()));}// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started.%n");}
}
第一步: 构建主题的订阅关系DefaultMQPushConsumerImpl#subscribe(String topic, String subExpression)
,
将需要订阅的主题信息存放到 RebalanceImpl
类中的 subscriptionInner
属性
第二步: 注册回调实现类,拉取到消息是会调用回调实现类对消息进行处理,处理完返回消费状态。如果消费失败,会走重试机制。
第三步: 启动消费者。消费者的启动流程跟生产者差不多。
启动消费者最终会调用 DefaultMQPushConsumerImpl#start
方法
this.checkConfig()
this.copySubscription()
MQClientInstance
实例。第590行OffsetStore
。LocalFileOffsetStore
, 消费进度更新在本地文件。RemoteBrokerOffsetStore
, 消费进度更新到broker。MQClientInstance
的 consumerTable
中MQClientInstance
this.updateTopicSubscribeInfoWhenSubscriptionChanged()
,MQClientInstance.updateTopicRouteInfoFromNameServer(java.lang.String)
方法。this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()
MQClientInstance
启动的时候会开启拉取消息的服务。在 MQClientInstance#start
方法中的
第240行会调用 PullMessageService#start
方法
// MQClientInstance 的第240行
this.pullMessageService.start();
PullMessageService
继承了ServiceThread
,ServiceThread
实现了Runnable
接口,
PullMessageService
是MQClientInstance
中的属性并跟随MQClientInstance
启动。
查看它的run方法,不断地从任务队列中拿PullRequest
出来,通过PullRequest
里面的内容去拉取消息。
public class PullMessageService extends ServiceThread {/*** 拉取消息的任务* MQClientInstance启动时调用RebalanceService#start方法, 会往这个队列里面放任务* 后面再分析是如何往这个队列添加任务的*/private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue();/*** 找到90行*/@Overridepublic void run() {log.info(this.getServiceName() + " service started");// 这里是一个死循环, 不断地从队列里面拿拉取消息的任务while (!this.isStopped()) {try {// 如果没有拉取消息的任务, 则会阻塞. 处理完PullRequest后, 会再次放进去PullRequest pullRequest = this.pullRequestQueue.take();// 拉取消息, 见下面分析this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
}
看下PullRequest
都有些啥
public class PullRequest {// 消费者组名称private String consumerGroup;// 队列信息[topic、brokerName、queueId], 知道拉取哪个队列的消息private MessageQueue messageQueue;/*** 处理消息的队列, 拉取回来的消息都会存储里面* 内部使用TreeMap保存未处理的消息, key 为 queueOffset*/private ProcessQueue processQueue;// 下一次拉取消息的偏移量private long nextOffset;
}
拿到PullRequest
去broker拉取消息
public class PullMessageService extends ServiceThread {/*** 找到79行*/private void pullMessage(final PullRequest pullRequest) {/*** 通过 consumerGroup 找到对应的消费者* 从MQClientInstance的consumerTable属性中获取* 消费者启动时已经将自己注册到consumerTable这个Map中*/final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;// 最终调用消费者自己的方法来拉取消息impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
}
查看DefaultMQPushConsumerImpl#pullMessage
方法
public class DefaultMQPushConsumerImpl implements MQConsumerInner {/*** 找到214行*/public void pullMessage(final PullRequest pullRequest) {/*** 从broker拉取回来的消息都是保存在ProcessQueue中*/final ProcessQueue processQueue = pullRequest.getProcessQueue();/*** 如果ProcessQueue已经被移除, 则不处理* 当消费者实例新增会减少时, 消费者消费的队列信息可能会有所变化* 比如有0 1 2 3四个队列, 刚开始只有一个消费者A, 消费者A要消费4个队列* 后来新增了一个消费者B(A和B是同一个消费者组), A分到的队列就变成0和1, B分到的队列就是2和3*/if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}// 设置最后拉取消息的时间戳pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());/*** 省略......做了一些校验工作* 从消息消费数量与消费间隔两个维度进行控制* 1.如果ProcessQueue当前未处理的消息条数超过了1000将触发流控,放弃本次拉取任务,* 将拉取任务延迟50毫秒再加入到任务队列中* 2.如果ProcessQueue当前未处理的消息大小超过100MB,放弃本次拉取任务,* 将拉取任务延迟50毫秒再加入到任务队列中* 3.非顺序消费,ProcessQueue中消息的最大偏移量与最小偏移量相差2000,放弃本次拉取任务,* 将拉取任务延迟50毫秒再加入到任务队列中*/// 309行 这里会设置拉取消息后的回调方法PullCallback pullCallback = new PullCallback() {// 省略, 见下面分析};/*** 省略......*/// 找到433行try {/*** 省略......* 这里会调用拉取消息的方法:PullAPIWrapper#pullKernelImpl* 消息拉取完成后将会调用回调方法*/} catch (Exception e) {log.error("pullKernelImpl exception", e);// 异常后,放弃本次拉取任务,将拉取任务延迟50毫秒再加入到任务队列中this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}
}
从broker拉取到消息就会调用回调方法: PullCallback#onSuccess
方法
public class DefaultMQPushConsumerImpl implements MQConsumerInner {/*** 找到214行*/public void pullMessage(final PullRequest pullRequest) {// 215行 从broker端拉取到消息后会存储到 ProcessQueuefinal ProcessQueue processQueue = pullRequest.getProcessQueue();// ....../*** 300行 拿到topic的订阅信息, 里面包含tag信息* 从broker拉取到消息后还会使用tag进行过滤*/final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());// ......// 309行PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {// 313行, 对拉取到的消息进行处理, 如:对tag进行过滤pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);// 316行switch (pullResult.getPullStatus()) {// 拉取到了消息case FOUND:// 省略......// 319行, 从拉取结果中取到下一次拉取消息的偏移量pullRequest.setNextOffset(pullResult.getNextBeginOffset());// 省略......// 325行, 如果拉取到的消息列表为空, 将拉取任务放入到队列中if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {// 省略......// 333行, 将拉取到消息放入到 ProcessQueueboolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 将拉取到的消息提交到ConsumeMessageService中供消费者消费, 见下面详细分析DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// 省略....../*** 最后将拉取任务放入到任务队列中* 这里会根据DefaultMQPushConsumer的pullInterval属性判断是否需要延时拉取* 如果 pullInterval > 0, 则会延迟 pullInterval 毫秒再放入任务队列*/}// 省略......break;// 省略default:break;}}}@Overridepublic void onException(Throwable e) {// 拉取异常, 延迟3s再将拉取任务放到队列中DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}};}
}
从上面的代码可以知道,拉取到的消息会交给ConsumeMessageService#submitConsumeRequest
方法进行处理
ConsumeMessageService
有两个实现类ConsumeMessageOrderlyService
和ConsumeMessageConcurrentlyService
,前者是顺序消费,后者是并发消费
消费者在启动的时候,会根据注册的MessageListener
类型进行选择,代码如下
public class DefaultMQPushConsumerImpl implements MQConsumerInner {// 575行public synchronized void start() throws MQClientException {// 619行if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}}
}
以并发消费方式为例,代码如下所示。
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {// 找到192行@Overridepublic void submitConsumeRequest(final List msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {// 每次消费的消息个数, 默认为1final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {/*** 生成一个消费请求, ConsumeRequest实现了Runnable接口*/ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 提交消费请求this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {// 出现异常, 则会延迟一段时间再重新提交this.submitConsumeRequestLater(consumeRequest);}} else {/*** 省略......* 如果拉取到的消息条数大于每次消费者设置的每次消费的最大条数* 则会生成多个 ConsumeRequest*/}}
}
拉取到的消息列表被分批封装成ConsumeRequest
提交到线程池中进行异步处理,没有先后消费的顺序。
看看ConsumeRequest
内部是如何处理的
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {// 350行class ConsumeRequest implements Runnable {// 370行@Overridepublic void run() {/*** 这里会先判断当前消费者是否还能继续消费这个队列* 当新的负载均衡使得当前消费者不再消费这个队列, 那就直接不处理了*/if (this.processQueue.isDropped()) {return;}// 拿到注册的回调实现类MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;// 省略......// 395行ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {// 省略......这里会遍历消息列表, 为每个消息设置一个开始消费的时间戳// 402行 调用回调方法消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {// 省略......}// 省略......// 430行if (null == status) {// 如果 null == status, 则说明回调方法返回null或者发生了异常// 设置稍后消费的状态status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 省略......// 447if (!processQueue.isDropped()) {/*** 里面根据消费结果做一些处理:更新消费进度、消费失败会投递到重试主题*/ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}}
}
ConsumeMessageConcurrentlyService#processConsumeResult
方法主要做了两件事:
%RETRY%+consumerGroup
,offset
。先将消费的消息列表从ProcessQueue.msgTreeMap
中移除。msgTreeMap
为空,offset=ProcessQueue.queueOffsetMax+1
,offset=ProcessQueue.msgTreeMap.firstKey()
。ProcessQueue.msgTreeMap
保存的是拉取回来未被消费的消息,key为消息的offset
。集群模式下,消息进度的更新是RemoteBrokerOffsetStore
,内部有一个offsetTable
记录了队列的消费进度,此时只是在内存中更新。
MQClientInstance
启动的时候会调用MQClientInstance#startScheduledTask
方法,开启一堆的定时任务,
默认每隔5s会将消费者的消费进度持久化到broker,具体方法在MQClientInstance.persistAllConsumerOffset
。
最终调用的是RemoteBrokerOffsetStore#persistAll
方法。
保证同一个队列里面的消息能够按顺序进行消费。
拉取回来的消息提交给ConsumeMessageOrderlyService
,会生成一个ConsumeRequest
请求提交到线程池中。
public class ConsumeMessageOrderlyService implements ConsumeMessageService {// 408行 找到内部类 ConsumeRequestclass ConsumeRequest implements Runnable {private final ProcessQueue processQueue;private final MessageQueue messageQueue;public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {this.processQueue = processQueue;this.messageQueue = messageQueue;}@Overridepublic void run() {// 省略....../*** 432行* 拿到当前队列的加锁对象, 具体逻辑见里面的方法*/final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);// 加锁, synchronized (objLock) {// 省略....../*** 466行* 从processQueue中按顺序取出一批消息*/List msgs = this.processQueue.takeMessages(consumeBatchSize);/*** 省略......*/}}}
}
顺序消费方式,消费前都要对队列加锁,保证一个队列同一时间只能有一个消费线程消费。如果消费失败,则延迟一定时候后将
ConsumeRequest
提交给ConsumeMessageOrderlyService
。
如果消费失败,当前队列停止消费,延迟一段时间再构造ConsumeRequest
提交,
当消费次数达到DefaultMQPushConsumer.maxReconsumeTimes
时才会投递到重试队列,
对消费状态的处理在这个方法里面ConsumeMessageOrderlyService.processConsumeResult
消息消费的流程如下所示
MQClientInstance
实例。会调用PullMessageService#start
方法开启拉取消息的任务。PullMessageService
不断地从队列pullRequestQueue
中获取PullRequest
。PullRequest
就是一个拉取消息的任务。PullRequest
是通过RebalanceService#run
方法触发生成的,MQClientInstance
实例启动的时候会触发RebalanceService#start
方法,具体实现后面再分析。PullMessageService
拿到PullRequest
后,通过consumerGroup找到对应的DefaultMQPushConsumerImpl
并调用它的pullMessage(final PullRequest pullRequest)
方法进行处理。PullRequest
中的MessageQueue[topic brokerName queueId]
和nextOffset
去broker拉取消息PullRequest.processQueue
中的msgTreeMap
PullRequest.processQueue
中的msgTreeMap
移除。PullRequest
再次放到PullMessageService.pullRequestQueue
队列中。