rabbitmq延迟队列之php实现

  

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付的钱,则系统自动取消订单。


<李>

定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务

<李>

pcntl_alarm为进程设置一个闹钟信号

<李>

swoole的异步高精度定时器swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)

<李>

rabbitmq延迟任务

以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现


消息的TTL(生存时间)

消息的TTL就是消息的存活时间.RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的,所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的过期字段或者队列x-message-ttl属性来设置时间,两者是一样的效果。下面例子是通过队列的TTL实现死信

queue 美元;=,new  AMQPQueue(美元通道);   队列→美元setName (params [& # 39; queueName& # 39;) ?: & # 39; & # 39;);   队列→美元setFlags (AMQP_DURABLE);   队列→美元setArguments(数组(   ,,,,,,,& # 39;x-dead-letter-exchange& # 39;,=祝辞,& # 39;delay_exchange& # 39;   ,,,,,,,& # 39;x-dead-letter-routing-key& # 39;,=祝辞,& # 39;delay_route& # 39;   ,,,,,,,& # 39;x-message-ttl& # 39;,=祝辞,60000年,   ));   队列→美元declareQueue ();

当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死”掉的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠死信交换。

死信交流交流的概念在这里就不在赘述,可以从这里进行了解。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

1。一个消息被消费者拒收了,并且拒绝方法的参数里requeue是假的。也就是说不会被再次放在队列里,被其他消费者使用。

2。上面的消息的TTL到了,消息过期了。

3。队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

死信交换其实就是一种普通的交换,和创建其他交换没有两只样。是在某一个设置死信交换的队列中有消息过期了,会自动触发消息的转发,发送到死信交换中去。



<强>

& lt; php ?      头(& # 39;- type: text/html; charset=use utf8; & # 39;);      $ params =,阵列(   ,,,& # 39;exchangeName& # 39;,=祝辞,& # 39;test_cache_exchange& # 39;   ,,,& # 39;queueName& # 39;,=祝辞,& # 39;test_cache_queue& # 39;   ,,,& # 39;routeKey& # 39;,=祝辞,& # 39;test_cache_route& # 39;   );      $ connectConfig =,阵列(   ,,,& # 39;主机# 39;,=祝辞,& # 39;localhost # 39;   ,,,& # 39;港口# 39;,=祝辞,5672年,   ,,,& # 39;登录# 39;,=祝辞,& # 39;rabbitmq # 39;   ,,,& # 39;密码# 39;,=祝辞,& # 39;rabbitmq # 39;   ,,,& # 39;vhost # 39;,=祝辞,& # 39;/& # 39;   );//var_dump (extension_loaded (& # 39; amqp # 39;));,判断是否加载amqp扩展//退出();   try  {   ,,,conn 美元;=,new  AMQPConnection ($ connectConfig);   ,,,康涅狄格州→美元connect ();   ,,,if (! $康涅狄格州→与()),{   ,,,,,,,//死(& # 39;Conexiune  esuata& # 39;);   ,,,,,,,//TODO 记录日志   ,,,,,,,echo  & # 39; rabbit-mq 连接错误:& # 39;,json_encode ($ connectConfig);   ,,,,,,,退出();   ,,,}   ,,,channel 美元;=,new  AMQPChannel(康涅狄格州美元);   ,,,if (! $通道→与()),{   ,,,,,,,//,死(& # 39;Connection  through  channel 失败# 39;);   ,,,,,,,//TODO 记录日志   ,,,,,,,echo  & # 39; rabbit-mq  Connection  through  channel 失败:& # 39;,json_encode ($ connectConfig);   ,,,,,,,退出();   ,,,}   ,,,exchange 美元;=,new  AMQPExchange(美元通道);   ,,,美元交换→setFlags (AMQP_DURABLE);//持久化   ,,,美元交换→setName(美元params [& # 39; exchangeName& # 39;) ?: & # 39; & # 39;);   ,,,美元交换→setType (AMQP_EX_TYPE_DIRECT);,//直接类型   ,,,美元交换→declareExchange ();      ,,,//美元通道→startTransaction ();      ,,,queue 美元;=,new  AMQPQueue(美元通道);   ,,,队列→美元setName(美元params [& # 39; queueName& # 39;) ?: & # 39; & # 39;);   ,,,队列→美元setFlags (AMQP_DURABLE);   ,,,队列→美元setArguments(数组(   ,,,,,,,& # 39;x-dead-letter-exchange& # 39;,=祝辞,& # 39;delay_exchange& # 39;   ,,,,,,,& # 39;x-dead-letter-routing-key& # 39;,=祝辞,& # 39;delay_route& # 39;   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

rabbitmq延迟队列之php实现