卡夫卡是如何处理延时任务的

  

本篇内容介绍了“KAFKA是如何处理延时任务的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、kafka服务端大概有哪些延时任务?

首先,我们需要了解一下kafka中大概有哪些需要延时的任务,该怎么查看呢?
很简单,kafka的设计都是基于接口的,那么我们只需要找到延时任务的顶层接口,然后看一下该接口有哪些实现类就知道有哪些延时任务了。
顶层抽象类接口是:DelayedOperation
对应的子类:

KAFKA是如何处理延时任务的

DelayedHeartbeat:就是用于做消费者心跳超时检测的;
DelayedProduce:就是做生产者设置ack=-1时需要等待所有副本确认写入成功的;
DelayedFetch:就是在消费的时候该分区没有数据,需要去做延时等待;
DelayedJoin:就是去做消费者加组的时候,在JOIN阶段需要延时等待。


二、kafka里面的延时任务是如何实现的呢?

这个答案已经在标题中就已经回答了,就是时间轮。
那么时间轮在kafka中是如何实现的呢?

kafka中的时间轮本体是一个20长度数组,不过内部持有上层数组的一个引用,数组中每个元素都是一个List,存放处于这个时间段的所有任务。
最后将这些有任务的List引用,放入DelayQueue来实现时间的流动,每次从DelayQueue中取出到期的List进行对应的操作。
翻译一下:
就是原本把所有延时任务都一股脑全部放入DelayQueue中,实在是太多了,由于DelayQueue底层数据结构是小顶堆,插入和删除的时间复杂度都是 O(nlog(n)),
n代表的具体任务的数量,当n值非常大时,对应的性能就很差,不能满足一个高性能中间件的要求。于是就想了个办法减小n的个数,
就是把原来的一个个延时任务,通过时间区间来封装成一个List,把List作为一个基本单位存入到DelayQueue中,那么这一样一来,就能把插入和删除的时间复杂度
从O(nlog(n))降低到接近O(1)[这里为什么是近似O(1)呢?你可以理解为时间轮是一个类hash表的结构],除此之外,最重要的就是大大减小了DelayQueue中元素的个数n,
因为一层时间轮就20个List,10层也就才200个,所以对于这么小数量的元素个数,DelayQueue是完全能hold的住的。
总结一下:
其实时间轮的设计思想就是批处理的思想,把一批任务根据时间区间封装成一个List,最后把List放到DelayQueue中去实现轮转的效果。
优化点主要是两个,一个是插入/删除的时间复杂度由O(nlog(n))降低到了近似O(1),第二个是大大减小了DelayQueue元素的个数。

了解设计思想,我们再看看实现原理:
 

1、核心函数:加入Task到时间轮中
分为三步:

  1. 如果任务已经超期就返回false

  2. 如果任务在自己的时间跨度内,就计算应该放入哪个桶中(在哪个时间区间);如果桶没在DelayQueue中则加入到DelayQueue中去。

  3. 如果任务的超时时间超过了自己的时间跨度,就往上层时间传,直到找到一个满足时间跨度的时间轮。

def add(timerTaskEntry: TimerTaskEntry): Boolean = {  val expiration = timerTaskEntry.expirationMs  if (timerTaskEntry.cancelled) { // 被取消    // Cancelled    false  } else if (expiration < currentTime + tickMs) { // 已经过期    // Already expired    false  } else if (expiration < currentTime + interval) { // 在有效期内    // Put in its own bucket    val virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)// Set the bucket expiration time    // 设置超时时间,如果该桶已经设置了超时时间则说明已经存在于DelayQueue中了    // 如果不存在超时时间,则需要将当前桶加入DelayQueue中    if (bucket.setExpiration(virtualId * tickMs)) {  // The bucket needs to be enqueued because it was an expired bucket      // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced      // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle      // will pass in the same value and hence return false, thus the bucket with the same expiration will not      // be enqueued multiple times.      queue.offer(bucket)    }true  } else { // 超过了当前层时间轮的时间跨度 需要向上层时间轮传递,如果上层不存在则新建    // Out of the interval. Put it into the parent timer    if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)
  ,,}}

卡夫卡是如何处理延时任务的