-
<李>背景李>
<李> <>强confirmCallback 强>确认模式李>
<李> <>强returnCallback 强>未投递到队列<强> 强>退回模式李>
<李> <>强shovel-plugin 强>跨机房可靠投递李>
背景
RabbitMQ在使用<强> >强的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景RabbitMQ。<强> >强为我们提供了两个选项用来控制消息的投递可靠性模式。
rabbitmq <强> 强>整个消息投递的路径为:
<强> 生产者→rabbitmq代理集群→→交换队列→消费者 强>
<强>消息强>从<强> 强>生产商到<强> rabbitmq代理集群>强则会返回一个<强> confirmCallback 强>。
<强>消息强>从<强>交换→队列>强投递失败则会返回一个<强> returnCallback >强。我们将利用这两个<强>调>强控制消息的最终一致性和部分纠错能力。
confirmCallback确认模式
在创建<强> connectionFactory >强的时候设置<强> PublisherConfirms(真正的)强>选项,开启<强> confirmcallback 强>。
<代码> CachingConnectionFactory工厂=new CachingConnectionFactory (); factory.setPublisherConfirms(真正);//开启确认模式代码>
<代码> RabbitTemplate RabbitTemplate=new RabbitTemplate(工厂); rabbitTemplate。setConfirmCallback((数据、ack引起)→{ 如果(ack) { log.error(“消息发送失败!”+原因+ data.toString ()); 其他}{ log.info(“消息发送成功,消息ID:”+(数据!=零?data.getId (): null)); } });代码>
我们来看下<强> ConfirmCallback 强>接口。
<代码>公共接口ConfirmCallback {/* * *确认回调。 * @param correlationData关联数据的回调。 * @param ack真ack,假纳 * @param导致一个可选的原因,纳,当可用,否则无效。 */无效的确认(CorrelationData CorrelationData,布尔ack,字符串原因); }代码>
重点是<强> CorrelationData 强>对象,每个发送的消息都需要配备一个<强> CorrelationData 强>相关数据对象,<强> CorrelationData 强>对象内部只有一个<强> id 强>属性,用来表示当前消息唯一性。
发送的时候创建一个<强> CorrelationData 强>对象。
<代码>用户用户=新用户(); user.setID l (1010101); user.setUserName(“全”); rabbitTemplate。convertAndSend(交换、路由、用户 信息→{ message.getMessageProperties () .setDeliveryMode (MessageDeliveryMode.NON_PERSISTENT); 返回消息; }, .toString新CorrelationData (user.getID()())); 代码>
这里将<强>用户ID 强>设置为当前消息<强> CorrelationData ID >强。当然这里是纯粹<强>演示>强,真实场景是需要做业务无关消息<强> ID 强>生成,同时要记录下这个<强> ID 强>用来纠错和对账。
消息只要被<强> rabbitmq代理强>接收到就会执行<强> confirmCallback >强,如果是集群<强> 强>模式,需要所有<强>代理强>接收到才会调用<强> confirmCallback 强>。
被<>强代理强>接收到只能表示消息<强> 强>已经到达服务器,并不能保证消息一定会被投递到目标<强>队列强>里。所以需要用到接下来的<强> returnCallback 强>。
returnCallback未投递到队列退回模式
<强> confrim 强>模式只能保证消息到达<强>代理>强,不能保证消息准确投递到目标<强>队列强>里。在有些业务场景下,我们需要保证消息一定要投递到目标<强>队列强>里,此时就需要用到<强> 强>退返回回模式。
同样创建<强> ConnectionFactory >强到时候需要设置<强> PublisherReturns(真正的)强>选项。
<代码> CachingConnectionFactory工厂=new CachingConnectionFactory (); factory.setPublisherReturns(真正);//开启返回模式代码>
<代码> rabbitTemplate.setMandatory(真正);//开启强制委托模式 rabbitTemplate。setReturnCallback(消息、replyCode replyText, 交换,routingKey)→ log.info (MessageFormat.format(“消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}”,消息,replyCode, replyText,交换,routingKey))); 代码>
这样如果未能投递到目标队列<强> 强>里将调用<强> returnCallback >强,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
shovel-plugin跨机房可靠投递
RabbitMQ<强> >强在跨机房集成提供了一个不错的插件<强>铲>强。使用<强> shovel-plugin