RocketMQ源码解读
约 1428 字大约 5 分钟
2025-09-24
读源码的方式
- 带着问题去读源码(先了解这个组件有哪几个核心功能,然后自己思考其中一个核心功能大概会怎么实现,再带着自己的问题去源码中找答案)
- 一个组件拆分为一个个小功能点,多读几次,不要一直纠结不懂的地方
- 分布总结,结合自己的理解总结一个个功能点的实现。对其中一些扩展功能点尝试验证,写测试代码。
nameserver启动流程梳理
- 参数解析:读取启动命令中的参数(如
-p 9876、-c /path/to/config)来覆盖默认配置。 - 网络服务初始化:创建
remotingServer实例,用作服务端,供客户端(生产者、消费者)获取Broker信息。创建remotingClient实例作为客户端,在集群中请求其他节点服务。 - 路由表初始化:创建
RouteInfoManager对象,这是 NameServer 的核心内存数据结构,用于存储在线的 Broker 信息。它包含以下几个核心 Map:topicQueueTable: key: topic名称, value: 该topic的队列信息列表(QueueData)brokerAddrTable: key: broker名称(clusterName),value: Broker数据(BrokerData),包含所属集群及主从地址映射clusterAddrTable: key: 集群名称, value: 该集群下所有broker名称的集合brokerLiveTable: key: broker地址, value: Broker实时信息(BrokerLiveInfo),包括上次心跳时间等filterServerTable: 过滤服务器信息
- 定时任务:启动一个每 10 分钟执行一次的定时任务,检查
brokerLiveTable。如果某个 Broker 的最后心跳时间与当前时间差超过 2 分钟,则认为其已下线,将其从路由表中移除。 - 就绪服务:启动 Netty 服务,开始监听端口,等待客户端连接。
特点总结:
- 轻量级:几乎不持久化任何数据(所有数据在内存中),重启后数据丢失,依赖 Broker 重新上报。
- 最终一致性:Broker 的路由信息通过心跳机制最终同步到所有 NameServer。
broker启动过程梳理
核心步骤详解:
BrokerController:所有核心模块资源都在这里,负责协调模块间的工作。在启动类
BrokerStartup的第一个步骤就是创建BrokerController,创建好后就是start启动 BrokerController。初始化核心模块:
创建BrokerController时,在
org.apache.rocketmq.broker.BrokerStartup#buildBrokerController里可以看到核心的几个配置模块 BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig、AuthConfig- BrokerConfig :Broker服务的配置信息
- NettyServerConfig :netty服务端配置,使Broker作为服务端,用于接收其他服务的请求
- NettyClientConfig :netty客户端配置,使Broker作为客户端端,用于请求其他服务,以及向NameServer注册发送心跳。
- AuthConfig :权限相关的配置
在
org.apache.rocketmq.broker.BrokerController#initialize会进行Metadata元数据,也就是topic、queue、consumerOffset的初始化,以及MessageStore的初始化和org.apache.rocketmq.broker.BrokerController#initializeRemotingServer- 消息存储模块(MessageStore):最复杂的模块,通常使用
DefaultMessageStore。 - 远程模块(RemotingServer):基于 Netty 实现,负责与 Producer、Consumer 及其他 Broker 通信。同时会注册一系列处理器(Processor) 来处理不同类型的请求(如发送消息、拉取消息、查询消息等)。
- 客户端管理器(ClientManager):管理连接的 Producer 和 Consumer 客户端。
向 NameServer 注册:
- Broker 启动后,会立即向配置的所有 NameServer 地址发送心跳包(循环发送)。
- 心跳包中包含:BrokerName、ClusterName、BrokerId(0表示Master,>0表示Slave)、Broker地址(IP:Port)以及 Topic 配置信息。
- 此后,会定期发送一次心跳,以维持自己在 NameServer 中的“在线”状态。
启动定时任务:
- 定时持久化消费进度(Offset)。
- 定时删除过期的文件。
- 如果是从节点(Slave),定时从 Master 同步消息。
就绪服务:所有模块启动成功后,Broker 开始对外提供服务。
Broker 与 NameServer 的交互
- 注册:Broker 启动时向所有 NameServer 发送注册请求(心跳),NameServer 将其信息更新到
brokerAddrTable,brokerLiveTable等路由表中。 - 心跳维持:Broker 每隔 30 秒向所有 NameServer 发送一次心跳,NameServer 收到后更新
BrokerLiveInfo中的lastUpdateTimestamp。 - 下线剔除:NameServer 的定时任务发现某个 Broker 超过 2 分钟未上报心跳,则判定其下线,并将其从路由表中删除。
- 客户端发现:Producer/Consumer 定时从 NameServer 拉取最新的路由信息,从而知道哪些 Broker 是在线的,Topic 分布在哪些 Broker 上。
Producer发送消息
producer的消息发送分两种情况
普通消息发送,消息发送完就可以结束。
事务消息发送,消息发送之后,还需要在事务消息过程中提供事务状态确认的服务,所以需要消息发送者在完成这个事务消息的确认机制后,才可以退出。
消息的发送主要分为两个步骤:
- start启动Producer,
org.apache.rocketmq.client.producer.DefaultMQProducer#start- 实例化客户端服务,
org.apache.rocketmq.client.impl.factory.MQClientInstance#start,MQClientAPIImpl负责请求和响应 - 初始化
Topic,从NameServer服务获取到Topic信息,Topic对应的MessageQueue信息,以及存在Queue的Broker信息。org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#initTopicRoute
- 实例化客户端服务,
- send发送消息,
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl- 根据要发送消息的Topic,先从本地获取队列信息,如果没有则去NameServer获取,再存到本地。
- 选择一个发送队列,负载均衡的去选择,上一次发送过的、上次发送失败的都不会选择。
- 消息发送, 根据Queue可以确定要发送给哪个
Broker服务org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
就算 NameServer 挂了一小段时间,Producer依旧可以发送消息,因为Producer会缓存信息到本地,只要能找到Broker,还是可以正常发送消息到Broker的
