RabbitMQ可靠投递

  
      <李>背景李   <李> <>强confirmCallback 确认模式李   <李> <>强returnCallback 未投递到队列<强> 退回模式李   <李> <>强shovel-plugin 跨机房可靠投递李   
  

背景

  RabbitMQ

在使用<强> 强的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景RabbitMQ。<强> 强为我们提供了两个选项用来控制消息的投递可靠性模式。

  rabbitmq

<强> 整个消息投递的路径为:
<强> 生产者→rabbitmq代理集群→→交换队列→消费者

  

<强>消息从<强> 生产商到<强> rabbitmq代理集群强则会返回一个<强> confirmCallback
<强>消息从<强>交换→队列强投递失败则会返回一个<强> returnCallback 强。我们将利用这两个<强>调强控制消息的最终一致性和部分纠错能力。

  

confirmCallback确认模式

  

在创建<强> connectionFactory 强的时候设置<强> PublisherConfirms(真正的)选项,开启<强> confirmcallback

  
 <代码> CachingConnectionFactory工厂=new CachingConnectionFactory ();
  factory.setPublisherConfirms(真正);//开启确认模式 
  
 <代码> RabbitTemplate RabbitTemplate=new RabbitTemplate(工厂);
  rabbitTemplate。setConfirmCallback((数据、ack引起)→{
  如果(ack) {
  log.error(“消息发送失败!”+原因+ data.toString ());
  其他}{
  log.info(“消息发送成功,消息ID:”+(数据!=零?data.getId (): null));
  }
  }); 
  

我们来看下<强> ConfirmCallback 接口。

  
 <代码>公共接口ConfirmCallback {/* *
  *确认回调。
  * @param correlationData关联数据的回调。
  * @param ack真ack,假纳
  * @param导致一个可选的原因,纳,当可用,否则无效。
  */无效的确认(CorrelationData CorrelationData,布尔ack,字符串原因);
  
  } 
  

重点是<强> CorrelationData 对象,每个发送的消息都需要配备一个<强> CorrelationData 相关数据对象,<强> CorrelationData 对象内部只有一个<强> id 属性,用来表示当前消息唯一性。

  

发送的时候创建一个<强> CorrelationData 对象。

  
 <代码>用户用户=新用户();
  user.setID l (1010101);
  user.setUserName(“全”);
  
  rabbitTemplate。convertAndSend(交换、路由、用户
  信息→{
  message.getMessageProperties () .setDeliveryMode (MessageDeliveryMode.NON_PERSISTENT);
  返回消息;
  },
  .toString新CorrelationData (user.getID()()));  
  

这里将<强>用户ID 设置为当前消息<强> CorrelationData ID 强。当然这里是纯粹<强>演示强,真实场景是需要做业务无关消息<强> ID 生成,同时要记录下这个<强> ID 用来纠错和对账。

  

消息只要被<强> rabbitmq代理接收到就会执行<强> confirmCallback 强,如果是集群<强> 模式,需要所有<强>代理接收到才会调用<强> confirmCallback

  

被<>强代理接收到只能表示消息<强> 已经到达服务器,并不能保证消息一定会被投递到目标<强>队列里。所以需要用到接下来的<强> returnCallback

  

returnCallback未投递到队列退回模式

  

<强> confrim 模式只能保证消息到达<强>代理强,不能保证消息准确投递到目标<强>队列里。在有些业务场景下,我们需要保证消息一定要投递到目标<强>队列里,此时就需要用到<强> 退返回回模式。

  

同样创建<强> ConnectionFactory 强到时候需要设置<强> PublisherReturns(真正的)选项。

  
 <代码> CachingConnectionFactory工厂=new CachingConnectionFactory ();
  factory.setPublisherReturns(真正);//开启返回模式 
  
 <代码> rabbitTemplate.setMandatory(真正);//开启强制委托模式
  
  rabbitTemplate。setReturnCallback(消息、replyCode replyText,
  交换,routingKey)→
  log.info (MessageFormat.format(“消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}”,消息,replyCode, replyText,交换,routingKey)));  
  

这样如果未能投递到目标队列<强> 里将调用<强> returnCallback 强,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

  

shovel-plugin跨机房可靠投递

  RabbitMQ

<强> 强在跨机房集成提供了一个不错的插件<强>铲强。使用<强> shovel-plugin

RabbitMQ可靠投递