概念
生产者
初始化
生产者通过SDK创建实例,生产者客户端的标识,用于区分不同的生产者。集群内全局唯一。,客户端ID由Apache RocketMQ 的SDK自动生成,主要用于日志查看、问题定位等运维场景,不支持修改。
还需要接入点信息,连接服务端的接入地址,NameServer地址,用于识别服务端集群。
可选的配置是可设置生产者组名,用于标识一组具有相同角色或职责的生产者。消息发送超时时间、重试次数等。
功能
- 消息构造:构建消息对象,包括设置消息的主题(Topic)、标签(Tag)、键值(Key)以及消息体(Body)。标签可以用来对消息进行二次分类,键值可用于唯一标识一条消息。
- 消息发送策略:决定是采用同步发送、异步发送还是单向发送。同步发送适用于要求高可靠性且允许一定延迟的场景;异步发送适合于追求高性能但能接受一定程度的消息丢失风险的情况;单向发送则用于对性能要求极高且不需要确认的场合。
- 消息发送失败处理:实现合理的重试机制,以应对可能发生的网络问题或其他异常情况导致的消息发送失败。
- 事务消息支持:如果业务场景需要,还可以使用RocketMQ提供的事务消息功能,确保消息发送与本地事务一致性的实现。
消息
消息是系统的核心元素之一,它不仅包含了用户发送的实际数据(即消息体),还包含了许多元数据用于支持消息的传递、存储和消费。
- 消息头(Message Header)
- Topic:消息所属的主题。
- Tags:用于对消息进行分类的标签,便于消费者过滤消息。
- Keys:消息的关键字,可以用于查询特定的消息。
- Unique Key (msgId):唯一标识一条消息的ID,由Broker生成。
- User Properties:用户自定义的属性,可以用来携带额外的信息。
- System Properties:系统级别的属性,如消息的生产者组名、消息的创建时间等。
- 消息体(Body)
- 这是用户实际要发送的数据内容,通常是一个字节数组(byte[]),可以是序列化后的对象、文本信息等任何形式的数据。
- 其他重要属性
- Delay Time Level:延迟级别,如果设置了该属性,则消息将按照指定的时间延迟后被消费。
- Transaction ID:事务消息特有的属性,用于实现分布式事务的支持。
- Reconsume Times:重试次数,记录了该消息被重新消费了多少次。
- Born Timestamp:消息产生的时间戳。
- Store Timestamp:消息被Broker持久化的时间戳。
- Queue Id:消息所在的队列ID。
- Commit Log Offset:消息在CommitLog中的物理偏移量。
RocketMQ 支持多种类型的消息,每种消息类型都有其特定的用途和应用场景。以下是 RocketMQ 中主要的消息类型及其特点:
1. 普通消息(Normal Message)
这是最基础的消息类型,用于一般的异步通信场景。
- 特点:
- 没有特殊要求或限制。
- 生产者发送消息到 Broker,消费者从 Broker 订阅并消费这些消息。
- 支持高吞吐量的消息传递。
2. 顺序消息(Orderly Message)
顺序消息确保消息在某个队列内按照发送顺序进行存储和消费。
- 特点:
- 全局顺序:整个 Topic 的所有消息都按顺序消费,适用于对顺序要求极高的场景,但性能较低。
- 分区顺序:每个队列内的消息保持顺序,适用于大部分需要保证顺序的场景,性能较好。
- 使用
MessageQueueSelector来指定消息发送到哪个队列以保证顺序。
3. 事务消息(Transactional Message)
事务消息支持分布式事务的一致性,允许生产者发送消息后,根据业务逻辑的结果决定提交或回滚消息。
- 特点:
- 半消息(Half Message):首先将消息作为“半消息”发送到 Broker,并标记为待确认状态。
- 本地事务执行:生产者执行本地事务逻辑。
- 提交或回滚:根据本地事务的执行结果,生产者通知 Broker 提交或回滚该消息。
- 补偿机制:如果 Broker 在一段时间内没有收到提交或回滚的通知,它会回调生产者的事务状态检查接口来确定消息的状态。
4. 定时/延迟消息(Delayed/Delay Message)
定时/延迟消息允许消息在指定的时间之后才被消费。
- 特点:
- 使用
delayTimeLevel参数设置延迟级别,默认提供多个预设的延迟时间选项(如 1s, 5s, 10s 等)。 - 可以通过配置自定义延迟时间级别。
- 消息不会立即对消费者可见,而是经过设定的时间间隔后才进入可消费状态。
- 使用
1. 普通消息(Normal Message)
处理方式:
- 存储:普通消息会被顺序写入 CommitLog 文件中,并根据其所属的 Topic 和 Queue ID 更新相应的 ConsumeQueue 文件。
- 传递:消费者从 Broker 订阅消息,根据订阅关系拉取消息。Broker 根据消费者的请求从 ConsumeQueue 中读取消息的偏移量信息,然后根据这些信息从 CommitLog 中读取具体的消息内容。
- 消费:消费者成功处理完消息后,提交消费进度(offset),标记这条消息已经被成功消费。如果处理失败,可以根据配置进行重试或放入死信队列。
2. 顺序消息(Orderly Message)
处理方式:
- 存储:与普通消息相同,顺序消息也会被写入 CommitLog 文件,并更新相应的 ConsumeQueue 文件。但是为了保证顺序性,消息必须发送到同一个 Message Queue 中。
- 传递:为了确保顺序消费,消费者需要对每个 Message Queue 进行独占锁定(即一个 Queue 同一时间只能由一个消费者实例处理)。这意味着即使有多个消费者实例并行运行,同一 Queue 的消息也只能由其中一个消费者按顺序处理。
- 消费:顺序消息的消费过程要求严格遵循 FIFO(先进先出)原则。如果某条消息消费失败,后续消息将无法继续消费,直到该消息被成功处理或采取其他措施(如手动跳过或重试)。
3. 事务消息(Transactional Message)
处理方式:
- 半消息存储:首先,生产者发送的是“半消息”(Half Message),这种消息在 Broker 上处于待确认状态。它不会立即对消费者可见。
- 本地事务执行:生产者在发送半消息之后,会执行本地事务逻辑。这可能涉及数据库操作或其他业务逻辑。
- 提交或回滚:根据本地事务的结果,生产者会通知 Broker 提交或回滚该半消息。如果提交,则消息变为可消费状态;如果回滚,则消息将被删除。
- 补偿机制:如果 Broker 在一段时间内没有收到提交或回滚的通知,它会回调生产者的事务状态检查接口来确定消息的状态。这一机制确保了即使在网络故障等情况下也能保持事务的一致性。
4. 定时/延迟消息(Delayed/Delay Message)
处理方式:
- 特殊存储:定时/延迟消息在 Broker 端被视为一种特殊的 Delay Message。它们首先被写入 CommitLog 文件,但不会立即对消费者可见。
- 延迟级别:RocketMQ 支持多种预设的延迟级别,默认提供的延迟时间包括 1s, 5s, 10s 等。用户可以通过设置
delayTimeLevel参数选择合适的延迟级别。 - 延迟调度:Broker 内部有一个调度器负责定期扫描这些延迟消息。当消息达到其设定的延迟时间后,调度器会将其转换为普通消息,并使其对消费者可见。
- 消费:一旦延迟时间到达,消息将按照正常的流程被消费。
NameServer
NameServer在RocketMQ中扮演着服务发现和路由信息管理的重要角色。启动一个NameServer实例的过程相对直接,主要是通过执行其对应的启动脚本或直接运行其主类来完成的。
在RocketMQ的二进制发布包中,提供了用于启动NameServer的脚本。对于Linux或Unix系统,可以使用nohup命令配合提供的runserver.sh脚本来后台运行NameServer。对于Windows系统,可以通过运行相应的批处理文件来启动NameServer .\bin\mqnamesrv.cmd
Broker
Broker 是RocketMQ的核心组件之一,负责消息的接收、存储、转发等操作。每个Broker可以视为一个独立的消息服务器实例,它处理来自Producer的消息,并将这些消息提供给Consumer进行消费。
一个Broker实际上是一个运行着特定服务的进程或一组进程,通常部署在一个单独的服务器上。这个服务器可能是一台物理机,也可能是在云环境中的一台虚拟机。
存储机制:Broker使用一种高效的文件存储机制来持久化消息数据。所有接收到的消息首先被顺序写入到名为CommitLog的文件中。为了提高读取效率,RocketMQ还引入了ConsumeQueue和IndexFile两种辅助存储结构:
- CommitLog:这是消息的实际存储位置,所有的消息都以追加的方式写入到CommitLog文件中。这保证了高吞吐量的数据写入能力。
- ConsumeQueue:每个主题下的每个队列都有对应的ConsumeQueue文件,它存储的是指向CommitLog中实际消息位置的索引信息(包括偏移量和大小),便于快速定位消息。
- IndexFile(可选):用于基于关键字快速检索消息,适用于需要根据某些条件查询消息的场景。
一个 Broker 实例只有一个 CommitLog 文件(或一组文件),所有主题(Topic)的所有消息都顺序写入这同一个 CommitLog 中。
CommitLog 是 RocketMQ 消息的物理存储文件。
所有从生产者发送来的消息,无论属于哪个主题(Topic),都会被顺序追加写入到同一个 CommitLog 文件中。
这种设计极大提升了磁盘的写入性能,因为顺序写避免了随机IO,充分利用了磁盘的高吞吐能力。
每个 Topic + Queue 对应一个 ConsumeQueue 文件。
ConsumeQueue 不存储消息内容,而是存储
它不存储消息体,只存储三条信息:
物理偏移量(Physical Offset):消息在 CommitLog 文件中的起始位置。
消息大小(Size):这条消息占了多少字节。
Tag 哈希值(Tag Hash Code):用于消息过滤(比如消费者只订阅某个 Tag)。
当消费者要消费某个队列的消息时,会先读取对应的 ConsumeQueue,再根据索引去 CommitLog 中读取消息内容。
每个Broker启动后会定期(默认每30秒)向所有配置的NameServer发送心跳包。这个心跳包包含了Broker的基本信息,如Broker名称、IP地址、端口号以及该Broker上存储的所有Topic信息。
当NameServer接收到Broker的心跳包时,它会更新内部维护的路由表,记录或更新Broker的信息。如果某个Broker超过一定时间(默认为120秒)没有发送心跳包,NameServer会认为该Broker已下线,并从路由表中移除其信息。
NameServer之间不会主动同步路由信息。每个NameServer独立工作,只维护自己接收到的Broker信息。因此,Producer和Consumer需要查询所有可用的NameServer以获得完整的路由信息。
Producer和Consumer并不直接向NameServer发送心跳。相反,它们会在需要发送或消费消息之前,主动查询NameServer获取最新的路由信息(即哪些Broker活跃并服务于特定的Topic)。这一过程通常被称为“服务发现”。
队列
队列 或者称为 Message Queue,是RocketMQ中消息存储和分发的基本单位。每个主题(Topic)可以包含多个队列,这些队列分布在不同的Broker上。
队列主要体现在ConsumeQueue文件中。每个队列对应一组ConsumeQueue文件,这些文件记录了该队列中所有消息在CommitLog中的位置信息。因此,当消费者请求拉取消息时,实际上是通过访问相应的ConsumeQueue文件找到消息在CommitLog中的具体位置,然后读取所需的消息内容。
每个队列是相对独立的。这意味着每个队列负责存储特定主题下的消息,并且消息的发送和消费操作都是针对单个队列进行的。不同队列之间不会直接共享消息,即使是同一主题下的不同队列也是如此。
同一主题(Topic)下的每一条消息只会被存储在其中一个队列(Message Queue)中,而不是在所有队列中都存储一份。
当生产者(Producer)发送一条消息到某个主题时,RocketMQ会根据一定的策略选择该主题下的一个具体队列来存放这条消息。常见的选择策略包括:
- 轮询(Round Robin):默认策略,依次将消息均匀地分发到主题下的各个队列中,实现负载均衡。
- 哈希(Hash):根据消息的某些属性(如Key)进行哈希计算,然后映射到特定队列。这种方式可以保证相同Key的消息被发送到同一个队列,从而保证消息的顺序性。
- 自定义选择器:开发者也可以实现
MessageQueueSelector接口,自定义消息分配到哪个队列。
无论使用哪种策略,最终结果是每条消息只被发送并存储到一个队列中。
所有消息(无论属于哪个队列)首先被顺序写入Broker的 CommitLog 文件(只追加的日志文件)。
然后,每个队列对应一个 ConsumeQueue 文件,它存储的是指向CommitLog中消息位置的索引(偏移量、大小、Tag Hash等)。
因此,消息本体只写入一次(在CommitLog中),而不同队列通过各自的ConsumeQueue索引不同的消息。
在 RocketMQ 中,存在多种“队列”类型:
- 普通消息队列(Message Queue):属于某个 Topic,用于正常消息的存储和消费。
- 重试队列(Retry Queue):主题为
%RETRY%<ConsumerGroup>,用于存放消费失败后需要重试的消息。 - 死信队列(DLQ, Dead-Letter Queue):主题为
%DLQ%<ConsumerGroup>,用于存放重试次数超过上限仍失败的消息。
普通队列是在 Topic 被首次使用时动态创建的,也可以在 Broker 启动时通过配置预先创建。
当 Broker 启动时,会加载配置文件中定义的 Topic 及其队列数量。
如果没有显式配置,RocketMQ 支持自动创建 Topic
(默认开启):
- 当生产者首次向一个不存在的 Topic 发送消息时,Broker 会自动创建该 Topic。
队列数量由 Broker 配置决定,如
defaultTopicQueueNums=4(默认 4 个队列)。
重试队列是在消费者首次启动并注册时,由 Broker 自动创建的。
- 当消费者(属于某个 Consumer Group)第一次启动,并订阅了一个 Topic 时,RocketMQ 会:
- 检查该消费者组是否已经有对应的重试主题
%RETRY%<ConsumerGroup>。 - 如果没有,Broker 会自动创建这个重试主题,并为其分配一定数量的队列(通常与原始 Topic 的队列数相同或默认 1~4 个)。
- 重试主题是以消费者组为单位创建的。
- 每个 Consumer Group 有且仅有一个重试主题:
%RETRY%<GroupName>。
- 检查该消费者组是否已经有对应的重试主题
死信队列是在第一条消息真正需要进入 DLQ 时才被创建的。
死信主题也是以消费者组为单位。
只有当真正有消息需要进入 DLQ 时才会创建,平时不会预先创建。
属于“故障兜底”机制,创建频率较低。
当一条消息在
%RETRY%...主题中重试了maxReconsumeTimes次(默认 16 次)后仍然失败- RocketMQ 会尝试将该消息投递到
%DLQ%<ConsumerGroup>主题。 - 如果该主题还不存在,Broker 会自动创建它,并为其创建若干队列(通常 1~4 个)。
- RocketMQ 会尝试将该消息投递到
普通队列:在 Topic 首次使用时创建(自动或手动)。
重试队列:在消费者组首次启动时自动创建。
死信队列:在第一条消息真正需要进入 DLQ 时才创建(最懒加载)。
主题
主题是对消息的第一级分类,所有相同类型的消息都会被发送到同一个主题下。
可以根据业务需求创建多个不同的主题。每个主题可以理解为一种消息类型的集合。
每个主题至少包含一个队列,但为了实现负载均衡和提高并发处理能力,通常一个主题会被划分为多个队列。这些队列可以分布在集群中的一个或多个Broker上。
消费者(组)
消费者组(Consumer Group):是一个逻辑概念,代表一组具有相同角色或功能的消费者的集合。属于同一个消费者组的所有消费者实例共享相同的消费进度(即消费位点),这意味着他们共同负责处理某个主题下的消息。这样设计的好处是可以实现负载均衡和故障转移。
消费者(Consumer):是实际执行消息拉取和处理工作的实体。一个消费者组可以包含多个消费者实例,这些实例可以在不同的服务器上运行,以提高消息处理的并行度和系统的容错能力。
创建
消费者分组名称
- 定义:消费者分组的名称,用于区分不同的消费者分组。集群内全局唯一。
- 取值:消费者分组由用户设置并创建。具体命名规范
投递顺序性
定义:消费者消费消息时,Apache RocketMQ 向消费者客户端投递消息的顺序。
根据不同的消费场景,Apache RocketMQ 提供顺序投递和并发投递两种方式。
默认投递方式为并发投递。
消费重试策略
- 定义:消费者消费消息失败时,系统的重试策略。消费者消费消息失败时,系统会按照重试策略,将指定消息投递给消费者重新消费。
- 重试策略包括:
- 最大重试次数:表示消息可以重新被投递的最大次数,超过最大重试次数还没被成功消费,消息将被投递至死信队列或丢弃。
- 重试间隔:Apache RocketMQ 服务端重新投递消息的间隔时间。
- 重试间隔仅在PushConsumer消费类型下有效。
订阅关系
- 定义:当前消费者分组关联的订阅关系集合。包括消费者订阅的主题,以及消息的过滤规则等。订阅关系由消费者动态注册到消费者分组中,Apache RocketMQ 服务端会持久化订阅关系并匹配消息的消费进度。
消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态。
客户端ID
- 定义:消费者客户端的标识,用于区分不同的消费者。集群内全局唯一。
- 取值:客户端ID由Apache RocketMQ 的SDK自动生成,主要用于日志查看、问题定位等运维场景,不支持修改。
通信参数
- 接入点信息 (必选) :连接服务端的接入地址,用于识别服务端集群。
预绑定订阅关系列表
定义:指定消费者的订阅关系列表。 Apache RocketMQ 服务端可在消费者初始化阶段,根据预绑定的订阅关系列表对目标主题进行权限及合法性校验,无需等到应用启动后才能校验。
取值:建议在消费者初始化阶段明确订阅关系即要订阅的主题列表,若未设置,或订阅的主题动态变更,Apache RocketMQ 会对目标主题进行动态补充校验。
消费监听器
- 定义:Apache RocketMQ 服务端将消息推送给消费者后,消费者调用消息消费逻辑的监听器。
- 取值:由消费者客户端本地配置。
- 约束:使用PushConsumer类型的消费者消费消息时,消费者客户端必须设置消费监听器。
消费者类型:Apache RocketMQ 面向不同的开发场景提供了多样的消费者类型,包括PushConsumer类型、SimpleConsumer类型、PullConsumer类型(仅推荐流处理场景使用)等。
消费者本地运行配置:消费者根据不同的消费者类型,控制消费者客户端本地的运行配置。例如消费者客户端的线程数,消费并发度等,实现不同的传输效果。
功能
消费者如何与消息队列交互
- 订阅主题:首先,消费者需要指定要订阅的主题(Topic)及其过滤条件(如标签Tag)。这决定了它将从哪些消息队列中拉取消息。
- 分配队列:RocketMQ会根据一定的策略(例如平均分配、一致性哈希等)为每个消费者组内的消费者实例分配该主题下的消息队列。这种分配是动态调整的,当有新的消费者加入或现有消费者退出时,RocketMQ会重新平衡队列分配。
- 拉取消息:消费者按照分配到的消息队列列表,主动向Broker发起请求来拉取消息。每次拉取操作都会基于当前的消费进度(存储在Broker或NameServer中),只拉取尚未被处理的消息。
- 消息过滤:如果设置了基于标签的过滤规则,那么在拉取消息之前,Broker会对消息进行过滤,仅返回符合条件的消息给消费者。
- 并发处理:由于一个主题可能包含多个队列,并且这些队列可以分布在不同的Broker上,因此可以通过增加消费者实例的数量来提高消息处理的并行度。
- 更新消费进度:成功处理完一批消息后,消费者需要及时更新自己的消费进度(offset),以便下次能够从中断的地方继续消费。这个信息通常会被持久化保存,以防止因崩溃导致的数据丢失。
消费进度实际上是保存在Broker端的。具体来说,每个消费者组针对每个主题下的每个队列都有一个对应的消费进度记录。当消费者提交了最新的消费进度后,Broker会负责持久化这个信息到本地磁盘上。
特性
消息发送重试
触发重试的场景
以下情况可能触发消息发送重试:
- 网络异常:如连接超时、读写超时。
- Broker 不可用:目标 Broker 节点宕机或未及时响应。
- Broker 返回错误码:如系统繁忙(
SYSTEM_BUSY)、服务不可用(SERVICE_NOT_AVAILABLE)、流控(FLOW_CONTROL)等非致命错误。 - 同步/异步发送失败(单向发送不支持重试)。
⚠️ 注意:单向发送(Oneway)不会触发重试,因为其设计初衷就是“发完即走”,不关心结果。
重试策略分类
RocketMQ 的重试行为因发送方式不同而有所差异:
1. 同步发送(Sync Send)
- 默认重试次数:2 次(即总共尝试 3 次:1 次原始 + 2 次重试)。
- 可配置:通过
setRetryTimesWhenSendFailed(int times)设置。 - 重试逻辑:
- 若当前 Broker 发送失败,会从 Topic 的路由信息中选择下一个可用的 Broker(轮询其他队列所在的 Broker)进行重试。
- 重试期间会跳过已失败的 Broker(避免反复失败)。
- 阻塞等待:每次重试都会阻塞当前线程,直到成功或达到最大重试次数。
2. 异步发送(Async Send)
- 默认重试次数:2 次(同样总共 3 次)。
- 可配置:通过
setRetryTimesWhenSendAsyncFailed(int times)设置。 - 重试逻辑:
- 与同步发送类似,也会尝试其他 Broker。
- 但重试是在回调线程池中异步执行,不影响主业务线程。
- 注意:异步重试依赖于回调机制,若回调未正确处理,可能导致重试逻辑被忽略。
3. 单向发送(Oneway Send)
- 不支持重试。
- 适用于日志上报、监控数据等对可靠性要求不高的场景。
重试过程中的关键行为
1. 队列选择策略
- 默认采用轮询(Round Robin) 选择队列。
- 重试时,RocketMQ 不会重复选择同一个 MessageQueue(除非队列数量为 1)。
- 如果 Topic 有多个队列分布在多个 Broker 上,重试会尝试其他 Broker,提升容错能力。
2. 超时控制
- 每次发送(包括重试)都受 sendMsgTimeout 控制(默认 3000ms)。
- 超时后立即进入下一次重试(如果还有剩余次数)。
3. 异常分类处理
RocketMQ 对异常做了区分:
| 异常类型 | 是否重试 |
|---|---|
RemotingException(网络异常) | ✅ 重试 |
MQClientException(客户端错误,如 Topic 不存在) | ❌ 不重试 |
MQBrokerException(Broker 返回错误) | ✅ 部分错误码会重试(如 SYSTEM_ERROR, BROKER_NOT_AVAILABLE) |
InterruptedException | ❌ 不重试 |
消费重试
消费重试(Consumer Retry) 是指当消费者处理某条消息失败时,RocketMQ 会将该消息重新投递给消费者再次尝试处理的机制。这是保障消息最终被成功消费的重要容错手段。
为什么需要消费重试?
- 消费者可能因 临时故障(如数据库连接抖动、依赖服务超时、内存不足等)导致消息处理失败。
- 若直接丢弃失败消息,会造成业务数据丢失。
- 通过重试机制,给予消费者多次机会恢复并成功处理消息,提升系统鲁棒性。
⚠️ 注意:消费重试 ≠ 消息重复,但重试确实可能导致同一条消息被多次投递,因此消费者必须实现幂等性。
消费重试的触发条件
当消费者在处理消息时:
- 抛出异常(未被捕获)
- 返回
RECONSUME_LATER状态(在 PushConsumer 的监听器中)
则 RocketMQ 认为该消息消费失败,触发重试流程。
✅ 正常消费应返回
CONSUME_SUCCESS。
消费重试的类型(按消费者模型区分)
RocketMQ 支持多种消费者类型,重试行为略有不同:
1. PushConsumer(默认推荐)
- 由 Broker 主动推送消息给消费者。
- 支持自动重试,且可配置重试策略。
- 重试消息会被发送到特殊的 重试主题(Retry Topic):
- 格式:
%RETRY%<ConsumerGroupName> - 例如:消费者组名为
OrderServiceGroup,则重试主题为%RETRY%OrderServiceGroup
- 格式:
重试流程(PushConsumer):
- 消费失败 → 返回
RECONSUME_LATER或抛异常。 - Broker 将该消息写入
%RETRY%...主题。 - 后续由同一个消费者组从重试主题中拉取该消息进行重试。
- 每次重试后,
ReconsumeTimes属性 +1。 - 若重试次数超过上限(默认 16 次),则消息进入 死信队列(DLQ):
- 主题格式:
%DLQ%<ConsumerGroupName>
- 主题格式:
重试间隔(仅 PushConsumer 有效):
RocketMQ 为重试消息设置了指数退避延迟,避免频繁重试压垮系统:
| 重试次数 | 延迟时间(秒) |
|---|---|
| 1 | 10s |
| 2 | 30s |
| 3 | 1min |
| 4 | 2min |
| 5 | 3min |
| 6 | 4min |
| 7 | 5min |
| 8~16 | 10min |
📌 这个延迟策略是内置不可配置的(除非修改源码或使用自定义重试逻辑)。
2. PullConsumer / SimpleConsumer(手动拉取)
- 消费者主动拉取消息,完全由应用控制消费逻辑。
- 不支持自动重试机制。
- 若消费失败,需开发者自行决定是否重新投递(如将消息重新发回原 Topic 或记录到外部重试队列)。
- 因此,这类消费者需自行实现重试、延迟、死信等逻辑。
💡 推荐:除非有特殊需求(如流批一体、精确控制拉取节奏),否则优先使用 PushConsumer。
关键配置参数
最大重试次数
- 集群消费模式(Clustering):默认 16 次
- 广播消费模式(Broadcasting):不支持重试(因为每个消费者独立消费,无法统一管理重试队列)
重试消息的存储与路由
- 重试主题
%RETRY%...本质是一个普通 Topic,由 Broker 自动创建。 - 其队列数量通常与原始 Topic 相同,或默认为 1~4 个。
- 消费者组在启动时会自动订阅自己的重试主题,无需显式声明。
- 重试消息的
ReconsumeTimes字段记录了已重试次数,可用于日志分析或跳过策略。
死信队列(DLQ)
- 当消息重试超过
maxReconsumeTimes仍未成功,Broker 会将其投递到死信队列%DLQ%<ConsumerGroup>。 - DLQ 中的消息不会再被自动消费,需人工介入:
- 查看日志定位问题
- 修复后手动重放
- 或归档丢弃
🔒 DLQ 权限:只有对应消费者组可以向其 DLQ 写入消息,其他组无法读写。
消息清理
消息清理 是指系统自动删除已过期或不再需要的消息数据,以释放磁盘空间、维持系统性能和稳定性的机制。由于 RocketMQ 采用顺序写 + 文件存储的架构,消息不会被立即物理删除,而是通过定时清理策略进行批量回收。
为什么需要消息清理?
- 磁盘容量有限:所有消息持久化到 CommitLog 文件中,若不清理会无限增长。
- 避免资源浪费:已被成功消费且无需保留的历史消息应被回收。
- 保障系统稳定性:防止因磁盘写满导致 Broker 崩溃或拒绝服务。
📌 注意:RocketMQ 的消息清理是基于时间或文件大小的,不是基于消费状态(即即使消息未被消费,只要过期也会被删除)。
消息存储结构回顾
RocketMQ 的消息主要存储在以下三类文件中:
| 文件类型 | 作用 |
|---|---|
| CommitLog | 所有消息的物理存储文件(只追加写),按固定大小分段(默认 1GB/文件) |
| ConsumeQueue | 每个 Topic-Queue 对应的逻辑队列索引文件,指向 CommitLog 中的位置 |
| IndexFile | 可选,用于按 Key 或时间范围快速检索消息 |
消息清理的核心对象是 CommitLog 文件,而 ConsumeQueue 和 IndexFile 会随之联动清理。
消息清理策略
RocketMQ 默认采用 “定时删除过期文件” 策略,由 Broker 后台线程周期性执行。
1. 触发条件
清理操作由以下两个参数共同决定(在 broker.conf 中配置):
| 参数名 | 默认值 | 说明 |
|---|---|---|
fileReservedTime | 72(小时) | 消息文件保留时间,默认 72 小时(3 天) |
deleteWhen | 04(凌晨 4 点) | 每天执行清理的时间点(24 小时制) |
✅ 实际清理时机 = 每天
deleteWhen时刻 + 文件最后修改时间距今超过fileReservedTime。
2. 清理单位:MappedFile(CommitLog 分段文件)
- CommitLog 被划分为多个固定大小的文件(默认 1GB),称为 MappedFile。
- 清理时以整个 MappedFile 为单位删除,而不是单条消息。
- 即使一个文件中还有部分未消费或未过期的消息,只要该文件整体满足“过期”条件,整体会被删除。
⚠️ 这意味着:消费者必须在 fileReservedTime 内完成消费,否则可能丢失消息!
3. 清理流程
- Broker 在每天
deleteWhen时间点启动清理任务。 - 遍历所有 CommitLog 文件,检查其最后修改时间。
- 若当前时间 - 最后修改时间 ≥
fileReservedTime× 3600 秒,则标记为可删除。 - 但有一个关键前提:该文件不能是当前正在写的活跃文件。
- 同时检查 ConsumeQueue 和 IndexFile,删除对应过期部分。
- 物理删除文件(unlink),释放磁盘空间。
4. 安全保护机制
为防止误删正在被消费的消息,RocketMQ 引入了 “最小保留时间”保护:
- 即使文件已过期,如果仍有消费者组的消费位点(offset)落在该文件范围内,则不会删除。
- 具体判断依据:Broker 会计算所有消费者组对每个 Topic-Queue 的最小消费 offset,反推出对应的 CommitLog 起始位置。
- 只有当文件的最大 offset < 所有消费者的最小消费 offset 时,才允许删除。
🔒 这确保了:只要还有消费者未消费完该文件中的消息,就不会被清理。
特殊消息类型的清理行为
1. 普通消息 & 顺序消息
- 遵循上述通用清理策略。
- 依赖
fileReservedTime控制生命周期。
2. 延迟消息
- 延迟消息在到达延迟时间前存储在特殊的 SCHEDULE_TOPIC_XXXX 主题中。
- 其清理时间 = 实际投递时间 + fileReservedTime。
- 即:延迟消息在变为普通消息后,才开始计算保留时间。
3. 事务消息(半消息)
- 半消息在未提交/回滚前也受清理策略影响。
- 如果本地事务长时间未响应,半消息可能在过期后被删除,导致事务失败。
4. 死信队列(DLQ)消息
- DLQ 消息同样受
fileReservedTime控制。 - 建议监控 DLQ 并及时处理,避免重要消息被自动清理。
磁盘空间紧急清理机制
当磁盘使用率过高时,RocketMQ 会启动强制清理:
- 参数:
diskMaxUsedSpaceRatio(默认 75%) - 当磁盘使用率 > 该值时:
- Broker 会拒绝新消息写入(返回
SYSTEM_BUSY) - 同时加速清理过期文件,即使未到
deleteWhen时间点
- Broker 会拒绝新消息写入(返回
- 若使用率 > 90%,可能直接进入只读模式
消息分类和过滤
消息分类:多级结构
RocketMQ 采用 两级分类模型:
1. 主题(Topic)—— 第一级分类
- 作用:对消息进行粗粒度划分,代表一类业务语义。
- 示例:
OrderTopic:订单相关消息PaymentTopic:支付事件LogTopic:系统日志
- 特点:
- 全局唯一,集群内共享。
- 每个 Topic 可配置队列数量(默认 4),影响并发能力。
- 是消息路由和权限控制的基本单位。
✅ 最佳实践:按业务域或功能模块划分 Topic,避免“一个 Topic 打天下”。
2. 标签(Tag)—— 第二级分类
- 作用:在同一个 Topic 内对消息进行细粒度分类。
- 类型:字符串(如
"CREATE","UPDATE","PAID")。 - 使用方式:java
Message msg = new Message("OrderTopic", "CREATE", "order_123", body); - 特点:
- 同一 Topic 下可有多个 Tag。
- Tag 在 Broker 端以 哈希值(hashCode) 形式存储于 ConsumeQueue,用于快速过滤。
- 支持消费者端基于 Tag 的订阅过滤。
⚠️ 注意:Tag 不是万能的!过度细分会导致管理复杂,建议每个 Topic 的 Tag 数量 ≤ 10 个。
3. 用户属性(User Properties)—— 自定义分类(高级)
作用:携带任意键值对元数据,用于更复杂的业务分类或上下文传递。
示例:
javamsg.putUserProperty("region", "cn-hangzhou"); msg.putUserProperty("priority", "high");特点:
- 不参与默认过滤,但可用于 SQL 表达式过滤(见下文)。
- 存储在消息头中,会增加网络和存储开销,应精简使用。
消息过滤:三种主流方式
RocketMQ 提供 Broker 端过滤 和 Consumer 端过滤 两种模式,优先推荐 Broker 端过滤 以减少无效网络传输。
方式 1:基于 Tag 的过滤(最常用)
原理
- 生产者发送消息时指定 Tag。
- 消费者订阅时声明感兴趣的 Tag。
- Broker 在拉取阶段根据 ConsumeQueue 中的 Tag Hash Code 进行匹配,只返回符合条件的消息。
特点
- ✅ 高效:过滤在 Broker 端完成,减少网络流量。
- ✅ 简单:API 直观,适合大多数场景。
- ❌ 局限:仅支持等值或 OR 条件,不支持复杂逻辑(如
region=hangzhou AND priority=high)。
📌 注意:
"*"表示订阅所有 Tag,但不会订阅无 Tag 的消息(即 Tag 为 null 或空字符串)。若需兼容,生产者应统一设置默认 Tag。
方式 2:基于 SQL 表达式的属性过滤(高级)
适用场景
- 需要根据多个 User Properties 进行动态组合过滤。
- 如:
region = 'cn-shanghai' AND delay < 5000
前提条件
- Broker 需启用 SQL 过滤支持(默认开启)。
- 消费者使用
MessageSelector.bySql()。
使用示例
// 生产者
Message msg = new Message("EventTopic", "*", body);
msg.putUserProperty("region", "cn-shanghai");
msg.putUserProperty("delay", "3000");
// 消费者
consumer.subscribe("EventTopic",
MessageSelector.bySql("region = 'cn-shanghai' AND delay BETWEEN 1000 AND 5000"));支持的 SQL 语法
- 比较操作:
=,<>,<,>,<=,>= - 逻辑操作:
AND,OR,NOT - 范围查询:
BETWEEN - 字符串匹配:
IN,LIKE(有限支持) - 函数:
IS NULL,IS NOT NULL
特点
- ✅ 灵活:支持复杂条件组合。
- ⚠️ 性能开销:Broker 需解析 SQL 并逐条评估消息属性,吞吐量低于 Tag 过滤。
- ⚠️ 属性必须为字符串类型(数值需转为字符串存储)。
💡 建议:仅在 Tag 无法满足需求时使用 SQL 过滤,并避免高频复杂表达式。
方式 3:Consumer 端本地过滤(兜底方案)
原理
- 消费者订阅全部消息(如
*),在消费监听器中自行判断是否处理。 - 示例:java
consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { String region = msg.getUserProperty("region"); if ("cn-beijing".equals(region)) { // 处理逻辑 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 仍需返回成功,否则触发重试 });
特点
- ✅ 完全灵活:可实现任意过滤逻辑。
- ❌ 低效:所有消息都传输到客户端,浪费带宽和 CPU。
- ❌ 易出错:若忘记返回
SUCCESS,会导致不必要的重试。
🚫 不推荐作为主要过滤手段,仅用于临时调试或极端定制场景。
特殊消息类型的过滤行为
1. 顺序消息
- 过滤逻辑与普通消息一致。
- 但需注意:同一业务 Key 的消息必须发往同一 Queue,否则顺序性失效。
- 过滤不影响顺序保证,只要生产者正确使用
MessageQueueSelector。
2. 延迟消息
- 延迟期间存储在
SCHEDULE_TOPIC_XXXX,消费者无法直接订阅。 - 到达延迟时间后,消息被转为普通消息投递到原 Topic。
- 过滤规则在原始 Topic 上生效,延迟阶段不参与过滤。
3. 事务消息
- 半消息(Half Message)对消费者不可见,不参与任何过滤。
- 提交后变为普通消息,按原 Topic/Tag 规则过滤。
4. 重试消息 & DLQ
- 重试消息的主题为
%RETRY%<Group>,自动继承原始消息的 Tag 和属性。 - 消费者组会自动订阅自己的重试 Topic,过滤规则同样适用。
- DLQ 消息同理,可用于分析失败原因。
消费进度管理
存储位置
消费进度实际上是保存在 Broker 端的。具体来说,每个消费者组针对每个主题下的每个队列都有一个对应的消费进度记录。当消费者提交了最新的消费进度(offset)后,Broker 会负责持久化这个信息到本地磁盘上。
更新时机
- 成功处理完一批消息后:消费者需要及时更新自己的消费进度(offset),以便下次能够从中断的地方继续消费。
- 这个信息通常会被持久化保存,以防止因崩溃导致的数据丢失。
消费进度提交方式
自动提交:这种方式下,RocketMQ 会在后台定期自动提交消费者的消费进度。这简化了开发者的工作,但可能带来一定的延迟或不精确性,尤其是在消费者处理速度不稳定的情况下。
手动提交:开发者可以选择手动控制何时提交消费进度。这种方式提供了更高的灵活性和准确性,允许根据业务逻辑决定最佳的提交点。例如,在处理一系列相关消息时,可以等到所有消息都处理成功后再统一提交,从而避免部分消息被重复处理。
并发与负载均衡
由于一个主题可能包含多个队列,并且这些队列可以分布在不同的 Broker 上,因此可以通过增加消费者实例的数量来提高消息处理的并行度和系统的容错能力。RocketMQ 提供了多种策略(如平均分配、一致性哈希等)来为每个消费者组内的消费者实例分配该主题下的消息队列,这种分配是动态调整的,当有新的消费者加入或现有消费者退出时,RocketMQ 会重新平衡队列分配。
死信队列与重试机制
当消息在 %RETRY%... 主题中重试了 maxReconsumeTimes 次(默认 16 次)后仍然失败,RocketMQ 会尝试将该消息投递到 %DLQ%<ConsumerGroup> 主题。如果该主题还不存在,Broker 会自动创建它,并为其创建若干队列(通常 1~4 个)。这一机制保证了即使某些消息无法正常消费,它们也不会丢失,而是被转移到专门的死信队列中等待后续处理。
