51. 源代码解读-RocketMQ消息重新消费
一. 前言
RocketMQ支持消息消费失败后重新消费,具体代码如下:
/* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + "%n"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } });
也就是需要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,那这个怎么实现的呢?
二. 代码流程
以push 非同步消息为例,消息消费过程可以参考https://blog.51cto.com/483181/2056301
我们从客户端成功获取到一条消息开始,也就是DefaultMQPushConsumerImpl.pullMessage
2.1. 获取消息回调
public void pullMessage(final PullRequest pullRequest) { ... PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: ... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispathToConsume); .... break; } } } @Override public void onException(Throwable e) { ... } }; ... try { this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { } }
pullKernelImpl获取消息后,如果是异步请求,那么将会回调pullCallback,我们假设成功拿到消息,也就是FOUND分支。
那么就会调用submit 消费请求:consumeMessageService.submitConsumeRequest
2.2 submit消息请求
submitConsumeRequest有两个实现类,一个是pull, 一个是push。
我们以push为例。
2.3 push submit消息请求
ConsumeMessageConcurrentlyService.java
public void submitConsumeRequest( final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); ... ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { } } } }
初始化一个ConsumeRequest Runnable对象,然后提交到线程池consumeExecutor里面,那么我们继续看ConsumeRequest。
2.4 ConsumeRequest
class ConsumeRequest implements Runnable { private final List msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue; @Override public void run() { ... try { ... status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { .... } .... if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } } }
首先看try-catch里面,这里面就是回调客户端来消费消息。
listener.consumeMessage(Collections.unmodifiableList(msgs), context);
就像我们的消息消费重写如下:
/* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + "%n"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } });
然后根据返回的status调用processConsumeResult来处理返回结果
2.5 processConsumeResult
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { ... switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: log.info("CLUSTERING..."); List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } log.info("msgBackFailed.isEmpty() [{}]", msgBackFailed.isEmpty()); if (!msgBackFailed.isEmpty()) { log.info("msgBackFailed [{}]", msgBackFailed); consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } ... }
2.5.1
首先会判断消费模式是集群还是广播模式,如果广播模式,就日志记录下不处理了。
如果是集群模式,那么把调用sendMessageBack发送消息到broker。等待下一次broker重新消费消息。
如果发送失败,那么立即就会消费消息。
2.6 发送消息到Broker
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { int delayLevel = context.getDelayLevelWhenNextConsume(); try { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true; } catch (Exception e) { ... } return false; }
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { .... } }
public void consumerSendMessageBack( final String addr, final MessageExt msg, final String consumerGroup, final int delayLevel, final long timeoutMillis, final int maxConsumeRetryTimes ) throws RemotingException, MQBrokerException, InterruptedException { ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); requestHeader.setGroup(consumerGroup); requestHeader.setOriginTopic(msg.getTopic()); requestHeader.setOffset(msg.getCommitLogOffset()); requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); log.info("addr [{}] request [{}] timeoutMillis [{}]", addr, request, timeoutMillis); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }
简单的调用api通过netty发到broker而已,请求码是RequestCode.CONSUMER_SEND_MSG_BACK
public static final int CONSUMER_SEND_MSG_BACK = 36;
Broker会把消息存储到文件中,当然也会让它的reconsume次数+1
具体可以参考SendMessageProcessor.proce***equest方法,这个后续再讲