RabbitMQ如何保证消息不丢失和不被重复消费

RabbitMQ如何保证消息不丢失和不被重复消费

彼方 1026 2022-04-24

RabbitMQ如何保证不丢失、不重复消费

使用消息队列的原因

  • 解耦
    业务系统调用了系统B和系统C,未来如果需要再接入系统D,则需要修改系统A的代码,太麻烦。
  • 异步
    将消息写入消息队列,非必要的业务逻辑以异步方式运行,不影响主业务响应速度。
  • 削峰
    并发量过大的时候,将请求保存在队列中,逐步处理,减轻数据库压力。

消息队列的缺点

  • 系统可用性降低
    系统中加入消息队列的话,如果消息队列服务挂了,那主业务系统可能也会收到影响,综合来看 系统的可用性会降低。
  • 系统复杂度增加
    加入消息队列之后,就要考虑很多附带问题,例如怎么保证消息不被重复消费,怎么保证消费传输的可靠性,因为为解决这类问题,系统的复杂度会逐步增加。

如何保证RabbitMQ高可用

直接上集群

如果保证消息不被重复消费

业务场景下的解决方式:

  1. 获取消息做insert操作,那可以直接给消息一个唯一主键,那么就算出现重复消费的情况,也会导致主键冲突,避免了数据库出现脏数据。
  2. 获取消息做redis的set操作,由于set的特性决定了它就不会重复,,无论set几次结果都一样。
  3. 准备一个第三方介质作为消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,就在redis中存储该消息,将<id,message>以K-V形式写入redis,消费者开始消费前,先去redis中查有咩有这个消息的消费记录即可。

如何解决消息丢失的问题

  • 生产者丢数据
    生产者的消息没有投递到消息队列中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢失消息。
    transaction机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit()),缺点是吞吐量下降了。
    实际生产中,一般使用confirm模式居多,一旦channel进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使生产者已经知道消息正确到达队列了。如果RabbitMQ没有处理该消息,则会发送一个Nack消息给你,你可以重试操作。
  • 消息队列丢数据
    处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以配合使用,你可以在消息持久化磁盘后,再给生产者发送一个ACK信号,这样如果消息持久化磁盘之前,RabbitMQ挂了,那么生产者收不到ACK信号,生产者就会自动重发。
    数据持久化的方式:
  1. 将queue的持久化标识durable设置为true,则代表这是一个持久队列。
  2. 发送消息的时候将deliveryMode = 2
    这样设置以后,rabbitMQ就算挂了,重启也能恢复数据。
  • 消费者丢数据
    启用手动确认模式可以解决这个问题
  1. 自动确认模式:消费者挂掉,待ACK的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完的消息会重回队列,但是异常会不断让消息重试。
  2. 手动确认模式:如果消费者来不及处理就死掉时,没有响应ACK时会重复发送一条信息给其他消费者,如果监听程序处理异常了,且未对异常进行捕捉,会一直重复接受消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ACK,也会一直重复发送消息。
    3.不确认模式:acknowledge=“none”不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
  • 如何保证消息的顺序性
    通过算法,将需要保持先后顺序的消息放到同一个消息队列中。然后只用一个消费者去消费该队列。同一个queue里的消息不一定是顺序消息。即生产者保证入队有序即可,出队以后的顺序交给消费者去保证。