MQ常见问题梳理
约 1727 字大约 6 分钟
2025-09-24
MQ怎么保证消息不丢失
消息的丢失分为下面几个场景
生产者发送消息
处理方法:在发送消息后,由Broker返回一个确认的状态,生产者接收到确认状态后才会完成消息发送流程,否则就是发送失败,可以进行重试等操作。
服务端接收消息存盘
如果想彻底解决服务端消息丢失问题,就需要一条消息写一次磁盘,但是这样在海量消息场景下操作系统是支撑不住的。
而使用定时刷盘,每隔10毫秒,进行一次刷盘操作,则在操作系统出现问题时,会存在丢失10毫秒数据的情况。
并不能彻底解决服务端消息存盘丢失消息问题。
需要和性能进行一个平衡取舍。
主从集群之间消息同步
RocketMQ有两种集群
方案一:指定Master节点,如果Master节点宕机,Slave从节点不会主动切换为Master主节点(需要手动操作)。但是如果Master节点消息还没有同步给Slave节点就宕机不可用,或磁盘损坏了,那么还没同步给Slave节点的消息就丢了。
方案二:Dledger高可用集群,自行选举Leader(通过Raft协议实现的一种机制)。有了Leader之后,系统的所有更改都需要通过领导者。更改数据时,Leader先将操作日志(SET 值命令)复制给其跟随节点,然后Leader等待,直到大多数follower跟随节点都记录了该操作日志。随后Leader就会Commit提交,Leader节点的值就变成了SET 的具体值,然后领导者通知跟随者该条目已提交,随后跟随节点接收到消息也都跟着提交。
在RocketMQ中使用Dledger集群的话,数据主从同步这个过程,数据安全性还是比较高的。基本可以认为不会造成消息丢失。(极端场景下,比如出现网络分区情况时,也会丢失一些未经过集群内确认的消息)
消费者消费失败或没有响应Broker
消费者处理完消息后,需要给Broker服务端一个响应,如果Broker端没有收到响应,就会将这个消息进行重新投递,进行重试操作。所以消费者消费失败,或者网络出现问题,导致Broker端没有收到响应,并不会造成消息的丢失。反而会造成消息幂等性问题,幂等也可以通过消息唯一标识判断进行解决,已经消费处理的消息,不再重复处理。
需要注意,如果消息处理时,在业务上失败了,但是最后捕获了异常,并响应Broker成功,这就会导致消息丢失。
如果使用的异步消费,可能存在消息丢失。
consumer.registerMessageListener(new MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,ConsumeConcurrentlyContext context) {
new Thread() {
public void run() {
//处理业务逻辑
}
};
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});MQ服务都挂了
整个MQ服务都挂了,怎么保证业务能够继续稳定进行,且业务数据不丢失。
可以设计一个降级缓存。Producer往MQ发消息失败了,就往降级缓存中写,将所有的业务消息先缓存到redis,然后,依然正常去进行后续的业务。再启动一个线程或定时任务,不断尝试将降级缓存中的数据往MQ中发送。
当MQ服务恢复过来后,这些消息可以尽快进入到MQ中,继续往下游Conusmer推送,而不至于造成消息丢失。
总结
消息零丢失方案,其实是没有最优解的。因为如果有最优解,那么这些MQ产品,就不需要保留各种各样的设计了。需要根据业务场景去进行一个取舍。
| 场景 | 解决方案 | 存在问题 |
|---|---|---|
| 生产者发送消息到MQ | 同步发送(需要等待确认结果)+多次尝试 | 降低吞吐 |
| 生产者发送消息到MQ | 事务消息机制 | 一个消息存在多次网络请求 |
| Broker收到消息后消息不丢失 | 设置同步刷盘 | I/O磁盘压力大 |
| Broker收到消息后消息不丢失 | 搭建Dledger集群 | 网络压力大,时时刻刻在进行节点间互相请求 |
| 消费者消费消息不丢失 | 同步处理消息,再提交offset | 无法通过异步提高吞吐 |
| 整个MQ集群挂了,怎么不丢失消息 | 增加临时的降级存储 |
MQ怎么保证消息的幂等性
给每条消息分配一个唯一的ID。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendPaySuccessMessage(String orderNo, String userId) {
// 构建消息内容
String messageBody = "订单支付成功,订单号: " + orderNo + ", 用户: " + userId;
// 构建消息,并设置 Keys 和其他属性
Message<String> message = MessageBuilder.withPayload(messageBody)
.setHeader(RocketMQHeaders.KEYS, orderNo) // !!!核心:设置业务唯一键
.setHeader(RocketMQHeaders.TAGS, "PAY_SUCCESS") // 设置Tag用于过滤
.build();
// 发送消息
SendResult sendResult = rocketMQTemplate.syncSend("ORDER_TOPIC", message);
// 打印结果,其中包含系统生成的MsgId和我们设置的Keys
System.out.println("消息发送成功!");
System.out.println("MsgId: " + sendResult.getMsgId());
System.out.println("Keys: " + orderNo); // 可以通过 sendResult.getMessageQueue() 等获取更多信息
}
}MQ怎么保证消息的顺序性
强调的是局部有序,而不是全局有序。全局有序是没有意义的。
就像微信发送消息,在每个群聊里的消息是有序的即可,而不用所有的群聊所有的消息是有序的。
RocketMQ需要将需要进行顺序消费的所有消息,发送到同一个队列里(通过设置 org.apache.rocketmq.client.producer.MessageQueueSelector),然后消费时通过设置MessageListenerOrderly,确保一个MessageQueue在同一时间只被一个消费线程处理 。
RabbitMQ只需要保证一个队列只有一个消费者,就可以保证顺序消费。
MQ怎么快速处理积压的消息
如果消息积压问题一直得不到解决,RocketMQ和Kafka在日志文件过期后,就会直接删除过期的日志文件。而这些日志文件上未消费的消息,就会直接丢失。
方案一:增加消费端
但是RocketMQ需要注意,消费端的数量不能超过MessageQueue的数量,超过的都不工作是无效的。
方案二:增加Topic
将原有队列的消息,全部转发到新的Topic里(转发速度很快,因为不涉及到业务),然后增加消费端消费这个Topic,相当于外挂,额外扩展了消费端数量。(仅临时处理,恢复正常后,需要移除这个转发)
