RocketMQ 怎样解决为了 实时拉取消息 而不得不一直轮询的问题

时间:2020-06-16 16:50:01   收藏:0   阅读:188

启动一个consumer的时候,我用的是DefaultMQPushConsumer。根据之前的   博文  ,push其实还是一次次的pullrequest,也就是拉,这里有个问题:如果需要实时性很高,也就是说broker收到一条消息,订阅的consumer就要马上收到,那么consumer就需要不停的轮询,一次pullrequest收不到消息,马上进行下一次请求,这样就非常的耗费资源。

rocketMQ是这样解决的:

1、broker这边,请求过来,如果有新消息返回,在consumer这边,异步请求的回调函数pullCallback中,判断pullResult不为null,那么把消息存到processQueue中之后,马上发起下一个请求。

2、如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。

原文:https://www.cnblogs.com/chuliang/p/13141279.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!