消费端如何保证消息队列MQ的有序消费

  

消息无序产生的原因

  

消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(生产者),但是往往在生产环境中有多个消息的消费端(消费者),尽管消费端在拉取消息时是有序的,但各个消息由于网络等方面原因无法保证在各个消费端中处理时有序。

  

消费端如何保证消息队列MQ的有序消费

  

场景分析

  
  

先后两次修改了商品信息,消息一个和消息B先后同步写入MySQL,接着异步写入消息队列中发送消息,此时消息队列生产端(生产者)按时序先后发出了A和B两条消息(消息一个先发的出,消息B后发出)。按业务逻辑,商品信息的最终状态需要以消息一个和消息B综合为准。

     

看似一个比较常见的同步写数据库,异步发送消息的场景,但实际上需要保证消息的有序消费。

  
      <李>假设1:消息一只包含修改的<>强商品名称强,消息B只包含修改的<>强商品重量强,此时消息队列的消费端实际上<强>不需要关注消息时序强,消息队列消费端(消费者)只管消费即可。   <李>假设2:消息一个包含修改的<>强商品名称,重量强,消息B包含修改的<>强商品名称强,此时消费端首先接收到消息B,后接收到消息,那么消息B的修改就会被覆盖。此时消息队列的消费端实际上又<强>需要关注消息时序   
  

可见,你无法保证消息中包含什么信息,此时必须保证消息的有序消费。

  

业务角度如何保证消息有序消费

  
      <李> <强>生产端在发送消息时,始终保证消息是全量信息。   <李> <>强消费端在接收消息时,通过缓存时间戳的方式,消费消息时判断消息产生的时间是否最新,如果不是则丢弃,如果是则执行下一步。   
  

下面通过伪代码的方式描述:

  
  

<强>生产端伪代码

  

insertWare(制品);#插入数据到数据库,通常在插入数据库时我们只会更新修改的字段,而不会全量插入

  

器皿=selectWareById (ware.getId);#获取商品的全量信息(此时是最新的),用于将它放入到消息队列中

  

syncMq(制品);#异步发送mq消息一个

  

<>强消费端伪代码

  

器皿=fetchWare ();#获取消息

  

如果(isLasted(制品))#通过商品的修改时间戳判断是否是最新的修改

  

?TODO #执行下一步业务逻辑

  其他

  

?返回#丢弃该消息

     

重点在于消费端如何判断该消息是否是最新的修改也就是<代码> isLasted 方法。

  
  

<强> isLasted方法

  

长修改=getCacheById (ware.getId);#获取缓存中该条商品的最新修改时间

  

如果器皿。getModified祝辞修改){#如果消息中商品修改时间大于缓存中的时间,说明是最新操作

  

?setCacheById(制品);#将该条消息的商品修改时间戳写入到缓存中

  

?其他返回true;
} #如果消息中的商品修改时间<强>小于缓存中的时间,说明该条消息属于“历史操作”,不对其更新

  

?返回false;

     

以上就是通过伪代码的方式,描述如何通过业务手段保证消息有序消费,重点在于<强>全量发送信息和缓存时间戳强。在其中还有一些技术实现细节。

  

例如:消费端消费消息B,执行到<强>获取时间戳缓存之后,并在重新设置新的缓存之前强,此时另一个消费端恰好也正在消费B它也正执行到<强>获取时间戳缓存强,由于消息一此时并没有更新缓存,消息一个拿的到的缓存仍然是旧的缓存,这时就会存在两个消费端都认为自己所消费的消息时最新的,造成该丢弃的消息没丢。

  

消费端如何保证消息队列MQ的有序消费

  

显然,这是<>强分布式线程安全问题,分布式锁通常使用复述,或者饲养员,加锁后的执行时序如下图所示。

  

消费端如何保证消息队列MQ的有序消费

  

这是从业务角度保证消息在消费端有序消费。通过在消息发送端全量发送消息以及在消息消费端缓存时间戳就可以保证消息的有序消费。

消费端如何保证消息队列MQ的有序消费