摘要:當接收到消息后,會在方法中調(diào)用方法,將的信息塞進緩存中,并且會定時發(fā)送心跳將發(fā)送給進行注冊。這也說明了當用集群模式去創(chuàng)建時,集群里面每個的的數(shù)量相同,當用單個模式去創(chuàng)建時,每個的數(shù)量可以不一致。
微信公眾號「后端進階」,專注后端技術(shù)分享:Java、Golang、WEB框架、分布式中間件、服務(wù)治理等等。
老司機傾囊相授,帶你一路進階,來不及解釋了快上車!
我還記得第一次使用rocketmq的時候,需要去控制臺預(yù)先創(chuàng)建topic,我當時就想為什么要這么設(shè)計,于是我決定擼一波源碼,帶大家從根源上吃透rocketmq topic的創(chuàng)建機制。
topic在rocketmq的設(shè)計思想里,是作為同一個業(yè)務(wù)邏輯消息的組織形式,它僅僅是一個邏輯上的概念,而在一個topic下又包含若干個邏輯隊列,即消息隊列,消息內(nèi)容實際是存放在隊列中,而隊列又存儲在broker中,下面我用一張圖來說明topic的存儲模型:
其實rocketmq中存在兩種不同的topic創(chuàng)建方式,一種是我剛剛說的預(yù)先創(chuàng)建,另一種是自動創(chuàng)建,下面我開車帶大家從源碼的角度來詳細地解讀這兩種創(chuàng)建機制。
自動創(chuàng)建默認情況下,topic不用手動創(chuàng)建,當producer進行消息發(fā)送時,會從nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么會默認拉取broker啟動時默認創(chuàng)建好名為“TBW102”的Topic:
org.apache.rocketmq.common.MixAll:
// Will be created at broker when isAutoCreateTopicEnable public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
自動創(chuàng)建的開關(guān)配置在BrokerConfig中,通過autoCreateTopicEnable字段進行控制,
org.apache.rocketmq.common.BrokerConfig:
@ImportantField private boolean autoCreateTopicEnable = true;
在broker啟動時,會調(diào)用TopicConfigManager的構(gòu)造方法,autoCreateTopicEnable打開后,會將“TBW102”保存到topicConfigTable中:
org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager:
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); }
broker會通過發(fā)送心跳包將topicConfigTable的topic信息發(fā)送給nameserver,nameserver將topic信息注冊到RouteInfoManager中。
繼續(xù)看消息發(fā)送時是如何從nameserver獲取topic的路由信息:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 生產(chǎn)者第一次發(fā)送消息,topic在nameserver中并不存在 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 第二次請求會將isDefault=true,開啟默認“TBW102”從namerserver獲取路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
如上方法,topic首次發(fā)送消息,此時并不能從namserver獲取topic的路由信息,那么接下來會進行第二次請求namserver,這時會將isDefault=true,開啟默認“TBW102”從namerserver獲取路由信息,此時的“TBW102”topic已經(jīng)被broker默認注冊到nameserver了:
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
if (isDefault && defaultMQProducer != null) { // 使用默認的“TBW102”topic獲取路由信息 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } }
如果isDefault=true并且defaultMQProducer不為空,從nameserver中獲取默認路由信息,此時會獲取所有已開啟自動創(chuàng)建開關(guān)的broker的默認“TBW102”topic路由信息,并保存默認的topic消息隊列數(shù)量。
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); }
從本地緩存中取出topic的路由信息,由于topic是第一次發(fā)送消息,這時本地并沒有該topic的路由信息,所以對比該topic路由信息對比“TBW102”時changed為true,即有變化,進入以下邏輯:
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
// Update sub info { SetsubscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator > it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } }
將“TBW102”topic路由信息構(gòu)建TopicPublishInfo,并將用topic為key,TopicPublishInfo為value更新本地緩存,到這里就明白了,原來broker們千辛萬苦創(chuàng)建“TBW102”topic并將其路由信息注冊到nameserver,被新來的topic獲取后立即用“TBW102”topic的路由信息構(gòu)建出一個TopicPublishInfo并且據(jù)為己有,由于TopicPublishInfo的路由信息時默認“TBW102”topic,因此真正要發(fā)送消息的topic也會被負載發(fā)送到“TBW102”topic所在的broker中,這里我們可以將其稱之為偷梁換柱的做法。
當broker接收到消息后,會在msgCheck方法中調(diào)用createTopicInSendMessageMethod方法,將topic的信息塞進topicConfigTable緩存中,并且broker會定時發(fā)送心跳將topicConfigTable發(fā)送給nameserver進行注冊。
自動創(chuàng)建與消息發(fā)送時獲取topic信息的時序圖:
預(yù)先創(chuàng)建其實這個叫預(yù)先創(chuàng)建似乎更加適合,即預(yù)先在broker中創(chuàng)建好topic的相關(guān)信息并注冊到nameserver中,然后client端發(fā)送消息時直接從nameserver中獲取topic的路由信息,但是手動創(chuàng)建從動作上來將更加形象通俗易懂,直接告訴你,你的topic信息需要在控制臺上自己手動創(chuàng)建。
預(yù)先創(chuàng)建需要通過mqadmin提供的topic相關(guān)命令進行創(chuàng)建,執(zhí)行:
./mqadmin updateTopic
官方給出的各項參數(shù)如下:
usage: mqadmin updateTopic [-b] [-c ] [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ] -b,--brokerAddr create topic to which broker -c,--clusterName create topic to which cluster -h,--help Print help -n,--namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 -o,--order set topic"s order(true|false -p,--perm set topic"s permission(2|4|6), intro[2:W 4:R; 6:RW] -r,--readQueueNums set read queue nums -s,--hasUnitSub has unit sub (true|false -t,--topic topic name -u,--unit is unit topic (true|false -w,--writeQueueNums set write queue nums
我們直接定位到其實現(xiàn)類執(zhí)行命令的方法:
通過broker模式創(chuàng)建:
org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:
// -b,--brokerAddrcreate topic to which broker if (commandLine.hasOption("b")) { String addr = commandLine.getOptionValue("b").trim(); defaultMQAdminExt.start(); defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); return; }
從commandLine命令行工具獲取運行時-b參數(shù)重的broker的地址,defaultMQAdminExt是默認的rocketmq控制臺執(zhí)行的API,此時調(diào)用start方法,該方法創(chuàng)建了一個mqClientInstance,它封裝了netty通信的細節(jié),接著就是最重要的一步,調(diào)用createAndUpdateTopicConfig將topic配置信息發(fā)送到指定的broker上,完成topic的創(chuàng)建。
通過集群模式創(chuàng)建:
org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:
// -c,--clusterNamecreate topic to which cluster else if (commandLine.hasOption("c")) { String clusterName = commandLine.getOptionValue("c").trim(); defaultMQAdminExt.start(); Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); System.out.printf("create topic to %s success.%n", addr); } return; }
通過集群模式創(chuàng)建與通過broker模式創(chuàng)建的邏輯大致相同,多了根據(jù)集群從nameserver獲取集群下所有broker的master地址這個步驟,然后在循環(huán)發(fā)送topic信息到集群中的每個broker中,這個邏輯跟指定單個broker是一致的。
這也說明了當用集群模式去創(chuàng)建topic時,集群里面每個broker的queue的數(shù)量相同,當用單個broker模式去創(chuàng)建topic時,每個broker的queue數(shù)量可以不一致。
預(yù)先創(chuàng)建時序圖:
何時需要預(yù)先創(chuàng)建Topic?建議線下開啟,線上關(guān)閉,不是我說的,是官方給出的建議:
rocketmq為什么要這么設(shè)計呢?經(jīng)過一波源碼深度解析后,我得到了我想要的答案:
根據(jù)上面的源碼分析,我們得出,rocketmq在發(fā)送消息時,會先去獲取topic的路由信息,如果topic是第一次發(fā)送消息,由于nameserver沒有topic的路由信息,所以會再次以“TBW102”這個默認topic獲取路由信息,假設(shè)broker都開啟了自動創(chuàng)建開關(guān),那么此時會獲取所有broker的路由信息,消息的發(fā)送會根據(jù)負載算法選擇其中一臺Broker發(fā)送消息,消息到達broker后,發(fā)現(xiàn)本地沒有該topic,會在創(chuàng)建該topic的信息塞進本地緩存中,同時會將topic路由信息注冊到nameserver中,那么這樣就會造成一個后果:以后所有該topic的消息,都將發(fā)送到這臺broker上,如果該topic消息量非常大,會造成某個broker上負載過大,這樣消息的存儲就達不到負載均衡的目的了。
掃面下方二維碼,關(guān)注「Java科代表」,開車帶你臨摹各種源碼,來不及解釋了快上車!?
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/74017.html
摘要:微信公眾號后端進階,專注后端技術(shù)分享框架分布式中間件服務(wù)治理等等。 微信公眾號「后端進階」,專注后端技術(shù)分享:Java、Golang、WEB框架、分布式中間件、服務(wù)治理等等。 前段時間有個朋友向我提了一個問題,他說在搭建 RocketMQ 集群過程中遇到了關(guān)于消費訂閱的問題,具體問題如下: showImg(https://segmentfault.com/img/remote/1460...
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復(fù)造輪子嗎?本文我們就帶大家來詳...
摘要:所以基于目前的設(shè)計,建議關(guān)閉自動創(chuàng)建的功能,然后根據(jù)消息量的大小,手動創(chuàng)建。如果發(fā)送消息,返回結(jié)果超時,這種超時不會進行重試了如果是方法本身耗時超過,還未來得及調(diào)用發(fā)送消息,此時的超時也不會重試。 先來看下producer核心的類設(shè)計,如下圖: showImg(http://pbdqyl9hh.bkt.clouddn.com/rocketmq/producer%E7%B1%BB%E5%...
摘要:消息生產(chǎn)者,負責(zé)發(fā)消息到。消息消費者,負責(zé)從上拉取消息進行消費,消費完進行。集群部署端完全消費正常后在進行手動確認。消息發(fā)送成功后,服務(wù)器返回確認消息給生產(chǎn)者。根據(jù)本地事務(wù)執(zhí)行的結(jié)果向發(fā)送提交或回滾消息。 RabbitMQerlang開發(fā),對消息堆積的支持并不好,當大量消息積壓的時候,會導(dǎo)致RabbitMQ的性能急劇下降。...
閱讀 3560·2023-04-26 00:39
閱讀 4786·2021-09-22 10:02
閱讀 2611·2021-08-09 13:46
閱讀 1175·2019-08-29 18:40
閱讀 1498·2019-08-29 18:33
閱讀 827·2019-08-29 17:14
閱讀 1569·2019-08-29 12:40
閱讀 3091·2019-08-28 18:07