利用复述,实现延时处理的方法实例

  


  

  

在开发中,往往会遇到一些关于延时任务的需求,例如

  

& # 8226;生成订单30分钟未支付,则自动取消

  

& # 8226;生成订单60秒后,给用户发短信

  

对上述的任务,我们给一个专业的名字来形容,那就是延时任务。
  

  

最近需要做一个延时处理的功能,主要是从卡夫卡中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。
  

  

  

说到java中的定时功能,首先想到的计时器和ScheduledThreadPoolExecutor,但是相比之下定时器可以排除,主要原因有以下几点:

  
      <李>定时器使用的是绝对时间,系统时间的改变会对定时器产生一定的影响,而ScheduledThreadPoolExecutor使用的是相对时间,所以不会有这个问题。   <李>定时器使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理,而ScheduledThreadPoolExecutor可以自定义线程数量。   <李>计时器没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个计时器崩溃,而ScheduledThreadPoolExecutor对运行时异常做了捕获(可以在afterExecute()回调方法中进行处理),所以更加安全。   
  

1, ScheduledThreadPoolExecutor决定了用ScheduledThreadPoolExecutor来进行实现,接下来就是代码编写啦(大体流程代码)。
  

  

主要的延时实现如下:

        ScheduledExecutorService executorService=new ScheduledThreadPoolExecutor(10、新NamedThreadFactory (“scheduleThreadPool”)   ThreadPoolExecutor.AbortPolicy ());//从消息中取出延迟时间及相关信息的代码略   int时延=0;   executorService。scheduleWithFixedDelay(新Runnable () {   @Override   公共空间run () {//具体操作逻辑   }},0,时延,TimeUnit.SECONDS);      

其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。
  

  

然后测试了一下,满足目标需求的功能,可以做到延迟指定时间后执行,至此似乎功能就被完成了。
  

  

大家可能疑问,这也太简单了有什么好说的,但是这种方式实现简单是简单但是存在一个潜在的问题,问题在哪呢,让我们看一下ScheduledThreadPoolExecutor的源码:
  

        公共ScheduledThreadPoolExecutor (int corePoolSize ThreadFactory ThreadFactory) {   超级(corePoolSize整数。MAX_VALUE 0   TimeUnit。纳秒,新DelayedWorkQueue (), threadFactory);}      

ScheduledThreadPoolExecutor由于它自身的延时和周期的特性,默认使用了DelayWorkQueue,而并不像我们平时使用的SingleThreadExecutor等构造是可以使用自己定义的LinkedBlockingQueue并且设置队列大小,问题就出在这里。

  

DelayWrokQueue是一个无界队列,而我们的目标数据源是卡夫卡,也就是一个高并发高吞吐的消息队列,很大可能在某一时间段有大量的消息过来从而导致伯父,在使用多线程时我们是肯定要考虑到伯父的可能性的,因为伯父带来的后果往往比较严重,系统伯父临时的解决办法一般只能是重启,可能会导致用户数据丢失等不可能挽回的问题,所以从编码设计阶段要采用尽可能稳妥的手段来避免这些问题。

  

2,采用复述和线程结合

  

这一次换了思路,采用复述来帮助我们做缓冲,从而避免消息过多伯父的问题。
  

  

相关复述zset api:
  

     //添加元素   ZADD关键得分成员[[评分员][评分员]…]//根据分值及限制数量查询   ZRANGEBYSCORE关键分钟马克斯[WITHSCORES][限制抵消数)//从zset中删除指定成员   ZREM关键成员(成员…)      

我们采用复述,基础数据结构的zset结构,采用分数来存储我们目标发送时间的数值,整体处理流程如下:

  
      <李>第一步数据存储:9:10分从卡夫卡接收了一条一个的订单消息,要求30分钟后进行发货通知,那我们就将当前时间加30分上钟然后转为时间戳作为一的得分,主要为的订单号存入复述中。代码如下:李   
        公共空间>   公共空间run () {//获取批量大小   int orderNum=Integer.parseInt (PropertyUtil.get (Constant.ORDER_NUM, " 100 "));   尝试{//批量获取离发送时间最近的orderNum条数据   日历日历=Calendar.getInstance ();   现在长=calendar.getTimeInMillis ();//获取无限早到现在的事件关键(防止上次批量数量小于放入数量,存在历史数据未消费情况)   Set

利用复述,实现延时处理的方法实例