怎么在芬兰湾的科特林中使用RocketMQ实现一个延时消息

  介绍

这期内容当中小编将会给大家带来有关怎么在芬兰湾的科特林中使用RocketMQ实现一个延时消息,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

<强>一。延时消息

延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

使用延时消息的典型场景,例如:

<李>

在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。

<李>

在电商系统中,用户七天内没有评价商品,则默认好评。

这些场景对应的解决方案,包括:

<李>

轮询遍历数据库记录

<李> JDK的DelayQueue

<李>

ScheduledExecutorService

<李>

基于石英的定时任务

<李>

基于复述的zset实现延时队列。

除此之外,还可以使用消息队列来实现延时消息,例如RocketMQ。

<强>二。RocketMQ

RocketMQ是一个分布式消息和流数据平台,具有低延迟,高性能,高可靠性,万亿级容量和灵活的可扩展性.RocketMQ是2012年阿里巴巴开源的第三代分布式消息中间件。

怎么在芬兰湾的科特林中使用RocketMQ实现一个延时消息

<强>三。RocketMQ实现延时消息 <强>

<强> 3.1业务背景

我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功,当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。
当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。
例如:1小时后尝试推送,3小时后尝试推送,1天后尝试推送,3天后尝试推送等等,因此,考虑使用延时消息实现该功能。

<强> 3.2生产者(生产者)

生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

首先,定义一个支持延时发送的AbstractProducer。

abstract  class  AbstractProducer : ProducerBean (), {   var 才能producerId:字符串?=,空   ,,var 主题:字符串?=,空   var 才能;标签:字符串?=零   var 才能timeoutMillis: Int ?=,空   var 才能delaySendTimeMills:长时间?,=零      val 才能;log =, LogFactory.getLog (this.javaClass)      open 才能;fun  sendMessage (messageBody:,,,标签:,字符串),{   ,,,val  msgBody =, JSON.toJSONString (messageBody)   ,,,val  message =,消息(话题,,标签,msgBody.toByteArray ())      ,,,if  (delaySendTimeMills  !=, null), {   ,,,,,val  startDeliverTime =, System.currentTimeMillis (), +, delaySendTimeMills ! !   ,,,,,message.startDeliverTime =startDeliverTime   ,,,,,log.info (,“send  delay  message  producer  startDeliverTime: $ {startDeliverTime} currentTime : $ {System.currentTimeMillis ()}“)   ,,,}   ,,,val  logMessageId =, buildLogMessageId(消息)   ,,,try  {   ,,,,,val  sendResult =,发送(消息)   ,,,,,log.info(时间+ logMessageId “producer 消息id:,“, +, sendResult.getMessageId (), +,“\ n", +,“messageBody:,“, +, msgBody)   ,,,},catch  (e:例外),{   ,,,,,log.error(时间+ logMessageId “messageBody:,“, +, msgBody  +,“\ n", +,“,错误:,“,+,e.message,, e)   ,,,}      ,,}      fun 才能buildLogMessageId(消息:消息):,String  {   ,,,return “主题:,“,+,message.topic  +,“\ n" +   ,,,,,,,“制作人:,“,+,producerId  +,“\ n" +   ,,,,,,,“标签:,“,+,message.tag  +,“\ n" +   ,,,,,,,“关键:,“,+,message.key  +,“\ n"   ,,}   }

根据业务需要,增加一个支持重试机制的制片人

@ component   @ConfigurationProperties (“mqs.ons.producers.xxx-producer")   @ configuration   @ data   class  CleanReportPushEventProducer : AbstractProducer (), {      lateinit 才能;var  delaySecondList: List      fun 才能sendMessage (messageBody: CleanReportPushEventMessage) {   ,,,//重试超过次数之后不再发事件   ,,,if  (delaySecondList !=null), {      ,,,,,如果(messageBody.times>=delaySecondList.size) {   ,,,,,,,回来   ,,,,,}   ,,,,,val  msgBody =, JSON.toJSONString (messageBody)   ,,,,,val  message =,消息(话题,,标签,msgBody.toByteArray ())   ,,,,,val  delayTimeMills =, delaySecondList [messageBody.times] * 1000 l   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

怎么在芬兰湾的科特林中使用RocketMQ实现一个延时消息