admin管理员组

文章数量:1122910

RocketMQ

本文基于RocketMQ 4.7.1版本

本文将从整体上分析broker如何启动。

文章目录

  • 一、broker启动
    • 1、createBrokerController()
    • 2、start()

一、broker启动

broker的启动类是BrokerStartup,其main方法如下:

	public static void main(String[] args) {start(createBrokerController(args));}

createBrokerController()用于创建BrokerController控制器,start()用于启动BrokerController。
BrokerController是broker启动的核心类,它会创建一些处理器和管理器,并且持有四大配置对象。它创建的管理器有生产者管理器、消费者管理器、broker状态管理器、主题配置管理器等,这些管理器在以后使用到的时候在介绍。
下面分别来看这两个方法的逻辑。

1、createBrokerController()

下图是createBrokerController方法的处理流程:

下面是对上图内容的一些解析。
broker对外提供服务的端口号不是可配置的,是代码中写死的,必须是10911。
因为broker的配置非常多,所以rocketmq将这些配置做了一下归类,分散到四个不同的配置类中:BrokerConfig属于broker的基本配置,NettyServerConfig和NettyClientConfig用于设置网络相关的配置,比如对外发布服务的端口号就是在NettyServerConfig中设置,MessageStoreConfig用于设置存储相关的配置,比如消息存储路径、日志路径。
如果当前的broker是master,那么其broker id必须是0。
rocketmq在目录/store/config目录下维护了几个非常重要的json文件:

  1. 主题配置文件是在个人目录下的/store/config目录中,文件名为topics.json,该文件中保存主题名、读队列个数、写队列个数、是否只读。文件解析完后,将数据存储到主题配置管理器TopicConfigManager的topicConfigTable属性中。
  2. 消费者位移文件(consumerOffset.json)记录主题在每个消费组下每个读队列的消费位移,比如:

    上图显示了消费组consumer-A消费了主题topicTest11的四个读队列的消息,value里面显示了四个读队列的位移。该文件由ConsumerOffsetManager处理,解析后的数据保存在该对象的offsetTable属性中。
  3. 订阅组文件(subscriptionGroup.json),也叫作消费组文件,里面保存了消费组的配置信息,比如消费组名、消费组是否可以消费消息、重试队列个数、最大重试次数等。该文件由SubscriptionGroupManager处理,解析后数据保存在该对象的subscriptionGroupTable属性中。
  4. 消费者过滤器配置文件(ConsumerFilterManager),记录了每个主题下消费组配置的消息过滤条件,解析后数据保存在ConsumerFilterManager的filterDataByTopic属性中。

消息存储器插件可以在配置文件里面由参数messageStorePlugIn设置,值使用全限定类名,多个插件之间使用“,”分隔,插件类需要继承抽闲类AbstractPluginMessageStore,该类实现了MessageStore接口,DefaultMessageStore也实现了MessageStore接口,当存储消息时,rocketmq将多个插件按照配置依次调用,最后调用DefaultMessageStore将消息存储到文件,因此插件提供了一种在消息存储前后修改数据的功能。
上图中的数据恢复并不是系统宕机后的恢复,这里的恢复是指读取文件来恢复内存数据,使内存数据尽可能恢复到停机前的现场,比如在停机前内存会记录消息文件的下一个写入位置,重启后这个数据就会丢失,那么需要重新读取消息文件,找到消息最后的写入位置,在读取期间,还会处理消息索引,处理读队列的数据。

2、start()

本方法主要是调用BrokerController.start()方法来启动控制器,BrokerController.start()又会启动一系列的组件,下面来看一下BrokerController.start()的代码:

	public void start() throws Exception {if (this.messageStore != null) {//启动DefaultMessageStore,该类会对/store/lock文件加锁,//确保在broker运行期间只有一个broker实例操作/store目录this.messageStore.start();}//启动Netty监听10911端口,可以对外提供服务if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {//监听10909端口,作用未知this.fastRemotingServer.start();}if (this.fileWatchService != null) {//fileWatchService与TLS有关,本文暂不对TLS解析this.fileWatchService.start();}if (this.brokerOuterAPI != null) {//启动客户端Netty,broke使用该对象对外发送数据,比如向nameserver注册主题信息this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {//作用未知this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {//作用未知this.clientHousekeepingService.start();}if (this.filterServerManager != null) {//作用未知this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {//处理HAstartProcessorByHa(messageStoreConfig.getBrokerRole());//启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());//向所有的nameserver发送本机所有的主题数据,//包括主题名、读队列个数、写队列个数、队列权限、是否有序等this.registerBrokerAll(true, false, true);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {//定时任务,每过一段时间向nameserver注册一次主题信息@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();//空实现}//启动定时任务,用于对收到的请求做流控,//如果发现broker收到的请求在指定的时间内无法处理完成,默认是5s,//那么会向请求方返回错误信息,告知broker正忙,请稍后重试//涉及到的请求有:生产者发送的消息、消费者拉去消息的请求、心跳请求、事务结束请求if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}

BrokerController.start()方法主要是启动一些组件,其主要作用是:

  1. 监听10911端口,接收生产者和消费者请求;
  2. 向nameserver发送本机所有的主题数据,并启动定时任务。

本文标签: RocketMQ