RocketMQ进阶
如何保证消息的可用性/可靠性/不丢失
生产阶段
生产阶段,主要通过请求确认机制,来保证消息的可靠传递
- 如果生产者是同步发送,注意返回的响应结果,如果为 ok,则成功发送到了 broker
- 异步发送的时候,在回调方法里检查,如果发送失败或者异常,进行重试
- 如果发生超时的情况,也可以通过查询日志的 API,来检查是否在 Broker 存储成功
存储阶段
存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。
- 消息只要持久化到 commitLog 中,即使 broker 宕机,未消费的消息也能重新恢复再消费
- broker 的刷盘机制有同步刷盘和异步刷盘,都可以保证消息存储在 pagecache 中,同步刷盘更为可靠, Producer 发送消息后等数据持久化到磁盘之后再返回响应给 Producer
- Broker 通过主从模式来保证高可用,Broker 支持 Master 和 Slave 同步复制、Master 和 Slave 异步复制模式,生产者的消息都是发送给 Master,但是消费既可以从 Master 消费,也可以从 Slave 消费。同步复制模式可以保证即使 Master 宕机,消息肯定在 Slave 中有备份,保证了消息不会丢失
消费阶段
Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认
消息队列维护着消费的位置,业务逻辑执行失败后,如果没有确认,再从队列去拉取消息,就还是原来的那一条
如果再接收后就立即执行确认,若失败,拉取消息就不是原来的那一条了
如何处理消息重复的消息
RocketMQ 可以保证消息一定投递,且不丢失,但无法保证消息不重复消费
比如在网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款信息,造成用户扣款了两次,这样是不符合实际需求的
所以可以在业务逻辑中加入检查逻辑,保证消息的幂等性
业务端可以通过一个专门的表来记录已经消费过的消息 ID,每次消费消息之前,先查询一下这个表,如果已经存在,就不再消费
如何保证消息幂等性
- 消息必须携带业务唯一标识,可以通过雪花算法生成全局唯一 ID
- 在消费者接收到消息后,判断 Redis 中是否存在该业务主键的标志位,若存在标志位,则认为消费成功,否则执行业务逻辑,执行完成后,在缓存中添加标志位
- 利用数据库的唯一索引来防止业务的重复插入
- 数据库表中使用版本号,通过乐观锁机制来保证幂等性。每次更新操作时检查版本号是否一致,只有一致时才执行更新并递增版本号。如果版本号不一致,则说明操作已被执行过,拒绝重复操作
- 悲观锁机制,通过数据库的锁机制来保证幂等性
怎么处理消息积压
如果当前 topic 的消息队列数量大于消费者数量,可以对消费者进行扩容,增加消费者来提高消费能力
如果当前 topic 的消息队列数量小于消费者数量,就得考虑扩容消息队列,可以新建一个临时的 topic,临时的 topic 多设置一些消息队列,然后消费者把消费的数据丢到临时 topic,用扩容的消费者去消费新的 topic 的数据,消费完之后恢复原状
顺序消息如何实现
顺序消息分为全局顺序和局部顺序
全局顺序为整个 Topic 的所有消息都严格按照发送顺序消费,这种方式性能较低
局部顺序为将需要保证消费顺序的消息发送到同一个队列
每个 MessageQueue 在 Broker 中对应一个 ConsumeQueue,消息按照到达 Broker 的顺序依次写入
当消费者开始消费某一个 MessageQueue 时,会在 broker 段对该队列进行枷锁,其他消费者就无法同时消费这个队列
同一时间只有一个消费者处理队列消息,保证了消费顺序
延时消息的原理
broker 收到延时休息后,发送给主题的相应消息队列中,通过一个定时任务轮询队列,等到期后,把消息投递到目标的 topic 中,然后就可以正确处理消息了
怎么实现分布式消息事务
实现分布式消息事务,主要依赖半消息,关键在于二次确认以及消息回查
半消息简单来说就是暂时不能被消费者消费的消息,生产者成功发送到 broker 端的消息,此消息被标记为暂不可投递状态
只有等生产者端执行完本地事务后,经过二次确认,消费者才能消费这条消息
大概流程为
- producer 向 broker 发送半消息
- 此时消息发送成功后,消息是半消息,标记为不可投递,consumer 消费不了
- producer 执行本地事务
- 完成后,向 broker 发送 commit/rollback,如果是 commit,broker 将半消息标记为可投递,可以被消费者消费,如果是 rollack,则丢弃消息
- 异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况
- Producer 端查询本地事务的状态
- 根据事务的状态提交 commit/rollback 到 broker 端
- 消费者段消费到消息之后,执行本地事务
死信队列
死信队列用来存储那些无法被正常处理的消息,这些消息就被称为死信
产生的死信的原因是消费者在处理消息时发生异常,且达到了最大的重试次数
当消费失败的原因排查并解决后,可以重新发送死信,重新消费
如果暂时无法处理,避免到期后被删除,可以先把死信消息导出保存