RabbitMQ延迟队列怎么利用Python实现

  介绍

本篇文章为大家展示了RabbitMQ延迟队列怎么利用Python实现,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

<强>延迟队列的基础原理生存时间(TTL)

RabbitMQ可以针对队列设置x-expires或者针对消息设置x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为死信(死信)
RabbitMQ消息的过期时间有两种方法设置。

通过队列(队列)的属性设置,队列中所有的消息都有相同的过期时间。(本次延迟队列采用的方案)对消息单独设置,每条消息TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为死信(死信)

<强>死信交流(DLX)

RabbitMQ的队列可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了一纸空文,则按照这两个参数重新路由转发到指定的队列。

<李>

x-dead-letter-exchange:出现死信(死信)之后将死信重新发送到指定交换

<李>

x-dead-letter-routing-key:出现死信(死信)之后将死信重新按照指定的routing-key发送

队列中出现死信(死信)的情况有:

<李>

消息或者队列的TTL过期。(延迟队列利用的特性)

<李>

队列达到最大长度

<李>

消息被消费端拒绝(基本。拒绝或基本。nack)并且requeue=false

综合上面两个特性,将队列设置TTL规则,队列TTL过期后消息会变成死信,然后利用DLX特性将其转发到另外的交换机和队列就可以被重新消费,达到延迟消费效果。

 RabbitMQ延迟队列怎么利用Python实现

延迟队列设计及实现(Python)

从上面描述,延迟队列的实现大致分为两步:

产生死信,有两种方式单消息TTL和队列TTL,因为我的需求中是所有的消息延迟处理时间相同,所以本实现中采用队列TTL设置队列的TTL,如果需要将队列中的消息设置不同的延迟处理时间,则设置的消息TTL(官方文档)

设置死信的转发规则,死信交易所设置方法(官方文档)

完整代码如下:

“““   Created 提醒Fri  Aug  3, 17:00:44  2018年      @author:知母   “““   import 鼠兔、json、日志记录   class  RabbitMQClient:   def 才能;__init__(自我,,conn_str=& # 39; amqp://用户:pwd@host:港口/% 2 f # 39;):   ,,,self.exchange_type =,“direct"   ,,,self.connection_string =conn_str   ,,,self.connection =, pika.BlockingConnection (pika.URLParameters (self.connection_string))   ,,,self.channel =, self.connection.channel ()   ,,,self._declare_retry_queue (), # RetryQueue 以及RetryExchange   ,,,logging.debug (“connection  established")   def 才能close_connection(自我):   ,,,self.connection.close ()   ,,,logging.debug (“connection  closed")   def 才能declare_exchange(自我,,交流):   ,,,self.channel.exchange_declare(交换=交换,   ,,,,,,,,,,,,,,,,,,exchange_type=self.exchange_type,   ,,,,,,,,,,,,,,,,,,持久=True)   def 才能declare_queue(自我,,队列):   ,,,self.channel.queue_declare(队列=队列,   ,,,,,,,,,,,,,,,,,耐用=True,)   def 才能;declare_delay_queue(自我,,队列,DLX=& # 39; RetryExchange& # 39;, TTL=60000):   ,,,,,,   ,,,创建延迟队列   ,,,:param  TTL:, TTL的单位是我们,TTL=60000,表示,60年代   ,,,:param 队列:   ,,,:param  DLX:死信转发的交换   ,,,:返回:   ,,,,,,   ,,,参数={}   ,,,if  DLX:   ,,,,,#设置死信转发的交换   ,,,,,参数[,& # 39;x-dead-letter-exchange& # 39;]=DLX   ,,,if  TTL:   ,,,,,参数[& # 39;x-message-ttl& # 39;]=TTL   ,,,print(参数)   ,,,self.channel.queue_declare(队列=队列,   ,,,,,,,,,,,,,,,,,耐用=True,   ,,,,,,,,,,,,,,,,,参数=参数)   def 才能_declare_retry_queue(自我):   ,,,,,,   ,,,创建异常交换器和队列,用于存放没有正常处理的消息。   ,,,:返回:   ,,,,,,   ,,,self.channel.exchange_declare(交换=& # 39;RetryExchange& # 39;,   ,,,,,,,,,,,,,,,,,,exchange_type=& # 39;展开# 39;   ,,,,,,,,,,,,,,,,,,持久=True)   ,,,self.channel.queue_declare(队列=& # 39;RetryQueue& # 39;,   ,,,,,,,,,,,,,,,,,持久=True)   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

RabbitMQ延迟队列怎么利用Python实现