RocketMQ5.0 NameServer啟動(dòng)流程
1. NameServer 啟動(dòng)
org.apache.rocketmq.namesrv.NamesrvStartup 的Main函數(shù)是啟動(dòng)的入口。
啟動(dòng)分成了兩塊:
- NameServer啟動(dòng)
- Controller啟動(dòng)(5.0為自動(dòng)自主切換新增的一個(gè)模塊,內(nèi)嵌NameServer的時(shí)候會(huì)啟動(dòng))
本篇文章只分析NameServer的啟動(dòng),Controller的啟動(dòng)在后續(xù)的文章中進(jìn)行分析
1.1 命令行參數(shù)解析
NameServer啟動(dòng)之前需要先對(duì)命令行參數(shù)進(jìn)行解析,將命令行參數(shù)解析為NameServer啟動(dòng)需要的參數(shù)配置。主要的命令行參數(shù)有兩個(gè)
命令 | 說明 |
-c | 設(shè)置配置文件文件位置 |
-p | 打印配置的參數(shù) |
-c命令行參數(shù)設(shè)置配置文件位置,然后將配置文件中的參數(shù)值解析設(shè)置為配置類的屬性值,涉及到的配置有如下幾個(gè):
- NamesrvConfig
- NettyServerConfig
- NettyClientConfig
- ControllerConfig(只有當(dāng)Controller內(nèi)嵌NameServer的時(shí)候才起作用)
namesrvConfig = new NamesrvConfig(); nettyServerConfig = new NettyServerConfig(); nettyClientConfig = new NettyClientConfig(); nettyServerConfig.setListenPort(9876); controllerConfig = new ControllerConfig(); if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file))); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, controllerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } 復(fù)制代碼
更多的參數(shù)設(shè)置修改可以參照源碼中NamesrvConfig、NettyServerConfig、NettyClientConfig、ControllerConfig中的類屬性。
1.2 創(chuàng)建NamesrvController
根據(jù)NamesrvController的構(gòu)造函數(shù)創(chuàng)建了三個(gè)重要的管理類實(shí)例:
- KVConfigManager
- BrokerHousekeepingService
- RouteInfoManager
KVConfigManager KV的持久化、序列化和反序列化處理 BrokerHousekeepingService 處理客戶端和NameServer的連接邏輯,這里的客戶端包括:生產(chǎn)者、消費(fèi)者,以及Broker RouteInfoManager 路由管理,主要管理Broker的元數(shù)據(jù),Topic的元數(shù)據(jù)信息
1.3 初始化NamesrvController
首先調(diào)用NamesrvController#initialize進(jìn)行初始化,我們看一下初始化做了什么事情。
public boolean initialize() { loadConfig(); initiateNetworkComponents(); initiateThreadExecutors(); registerProcessor(); startScheduleService(); initiateSslContext(); initiateRpcHooks(); return true; } 復(fù)制代碼
1.3.1 loadConfig
1.3.2 initiateNetworkComponents
private void initiateNetworkComponents() { this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingClient = new NettyRemotingClient(this.nettyClientConfig); } 復(fù)制代碼
創(chuàng)建NameServer的網(wǎng)絡(luò)服務(wù),以及NameServer的客戶端。
1.3.3 initiateThreadExecutors
這里初始化了兩個(gè)線程池:
- clientRequestExecutor線程池處理客戶端(生產(chǎn)者和消費(fèi)者)獲取Topic的路由信息(RequestCode.GET_ROUTEINFO_BY_TOPIC)
- defaultExecutor線程池處理除了RequestCode.GET_ROUTEINFO_BY_TOPIC以外的請(qǐng)求。
:::tip 在5.0版本后多了一個(gè)clientRequestExecutor線程池,主要是因?yàn)樵黾覰ameServer的可用性,即使defaultExecutor不能正常工作出現(xiàn)宕機(jī)的情況,客戶端仍然可以獲取Topic的路由信息而進(jìn)行的線程池的隔離。 具體可以參照[RIP-29] :::
1.3.4 registerProcessor
將線程池和處理器綁定。 Rocketmq5.0版本對(duì)處理器進(jìn)行了線程池隔離,將獲取路由相關(guān)的處理和其他的處理例如Broker的注冊(cè)進(jìn)行線程池的隔離。
1.3.5 startScheduleService
啟動(dòng)三個(gè)定時(shí)任務(wù),兩個(gè)是打印的的定時(shí)任務(wù)沒有業(yè)務(wù)邏輯,只有scanNotActiveBroker定時(shí)任務(wù)的作用:默認(rèn)每5秒掃描一次Broker是否過期。
1.3.5 initiateSslContext
初始化SsL
1.3.6 initiateRpcHooks
private void initiateRpcHooks() { this.remotingServer.registerRPCHook(new ZoneRouteRPCHook()); } 復(fù)制代碼
目前只注冊(cè)了一個(gè)ZoneRouteRPCHook,主要用于區(qū)域路由。
1.4 啟動(dòng)NamesrvController
public void start() throws Exception { this.remotingServer.start(); // In test scenarios where it is up to OS to pick up an available port, set the listening port back to config if (0 == nettyServerConfig.getListenPort()) { nettyServerConfig.setListenPort(this.remotingServer.localListenPort()); } this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress() + ":" + nettyServerConfig.getListenPort())); this.remotingClient.start(); if (this.fileWatchService != null) { this.fileWatchService.start(); } this.routeInfoManager.start(); } 復(fù)制代碼
啟動(dòng)NameServer的Netty對(duì)外的服務(wù)和客戶端服務(wù),在文件監(jiān)控服務(wù)不為空的情況下啟動(dòng)服務(wù)。 路由管理服務(wù)啟動(dòng): 主要是啟動(dòng)了批量注銷服務(wù)。到這里整個(gè)服務(wù)就已經(jīng)啟動(dòng)完成。
2. 總結(jié)
- 啟動(dòng)參數(shù)解析:NameServer 啟動(dòng)時(shí)需要指定一些參數(shù),例如監(jiān)聽端口、RocketMQ 集群的名稱等等。NameServer 會(huì)先解析這些參數(shù),并根據(jù)這些參數(shù)進(jìn)行初始化。
- 加載配置文件:NameServer 還會(huì)加載配置文件,包括 broker 配置、路由配置、Topic 配置等等,這些配置文件可以指定在啟動(dòng)參數(shù)中,也可以在啟動(dòng)后進(jìn)行修改。
- 創(chuàng)建 MBeanServer:NameServer 還會(huì)創(chuàng)建一個(gè) MBeanServer,用于對(duì) NameServer 進(jìn)行監(jiān)控和管理。
- 啟動(dòng) Netty 服務(wù)端:NameServer 的主要功能是接收 Broker 節(jié)點(diǎn)的注冊(cè)請(qǐng)求和心跳信息,并維護(hù) Broker 節(jié)點(diǎn)的狀態(tài)。為此,NameServer 會(huì)啟動(dòng)一個(gè) Netty 服務(wù)端,用于接收和處理這些請(qǐng)求。
- 注冊(cè) ShutdownHook:NameServer 還會(huì)注冊(cè)一個(gè) ShutdownHook,用于在 NameServer 關(guān)閉時(shí)執(zhí)行一些清理工作,例如關(guān)閉 Netty 服務(wù)端、保存路由信息等等。
- 初始化定時(shí)任務(wù):NameServer 還會(huì)初始化一些定時(shí)任務(wù),例如定時(shí)刷新路由信息、定時(shí)清理過期的 Broker 節(jié)點(diǎn)等等。這些定時(shí)任務(wù)是通過 Java 自帶的 ScheduledExecutorService 實(shí)現(xiàn)的。
- 啟動(dòng)完成:最后,NameServer 啟動(dòng)完成,并等待 Broker 節(jié)點(diǎn)的注冊(cè)和心跳信息。
以上就是 RocketMQ NameServer 的啟動(dòng)流程。需要注意的是,RocketMQ 集群中至少需要一個(gè) NameServer 節(jié)點(diǎn),多個(gè) NameServer 節(jié)點(diǎn)可以提高系統(tǒng)的可用性和容錯(cuò)性。