使用Rabbitmq延迟队列怎么实现定时任务

  介绍

这篇文章给大家介绍使用Rabbitmq延迟队列怎么实现定时任务,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

<强> Rabbitmq延迟队列

Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(交换)和消息的存活时间TTL(生存时间)

<强>死信交换机

一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。

<李>

一个消息被消费者拒收了,并且拒绝方法的参数里requeue是假的。也就是说不会被再次放在队列里,被其他消费者使用。

<李>

上面的消息的TTL到了,消息过期了。

<李>

队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

<强>消息TTL(消息存活时间)

消息的TTL就是消息的存活时间.RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的,所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

byte [], messageBodyBytes =,,你好,,世界!“.getBytes (),,   AMQP.BasicProperties  properties =, new  AMQP.BasicProperties (),,   properties.setExpiration (“60000“),   channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串: 当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去

处理流程图

使用Rabbitmq延迟队列怎么实现定时任务

创建交换机(Exchanges)和队列(Queues)

创建死信交换机

使用Rabbitmq延迟队列怎么实现定时任务 

如图所示,就是创建一个普通的交换机,这里为了方便区分,把交换机的名字取为:delay

创建自动过期消息队列

这个队列的主要作用是让消息定时过期的,比如我们需要2小时候关闭订单,我们就需要把消息放进这个队列里面,把消息过期时间设置为2小时

使用Rabbitmq延迟队列怎么实现定时任务 

创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置,另外两个参数x-dead-letter-exchange代表消息过期后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不同的队列

创建消息处理队列

这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理

使用Rabbitmq延迟队列怎么实现定时任务 

消息队列的名字为delay_queue2

消息队列绑定到交换机

进入交换机详情页面,将创建的2个队列(delay queue1和delay queue2)绑定到交换机上面

使用Rabbitmq延迟队列怎么实现定时任务 

自动过期消息队列的routing key 设置为delay

绑定delay

使用Rabbitmq延迟队列怎么实现定时任务