Java 延迟队列 DelayQueue 的原理

一、什么是延迟队列(DelayQueue)?

  DelayQueue 是 Java 并发包 java.util.concurrent 下的一个 Class,其官方定义如下所示。

		/**
         * An unbounded {@linkplain BlockingQueue blocking queue} of
         * {@code Delayed} elements, in which an element can only be taken
         * when its delay has expired.  The <em>head</em> of the queue is that
         * {@code Delayed} element whose delay expired furthest in the
         * past.  If no delay has expired there is no head and {@code poll}
         * will return {@code null}. Expiration occurs when an element's
         * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
         * than or equal to zero.  Even though unexpired elements cannot be
         * removed using {@code take} or {@code poll}, they are otherwise
         * treated as normal elements. For example, the {@code size} method
         * returns the count of both expired and unexpired elements.
         * This queue does not permit null elements.
         * /

  由定义可知,DelayQueue 是一个无界阻塞队列,队列中的元素只有在延迟期满后才能被取出。队列的头部存储的是最先到期的元素。添加进该队列的元素必须实现 Delayed 接口,指定延迟时间,元素过期的判断是根据 getDelay(TimeUnit unit) 方法的返回值,返回值小于等于 0,则认为元素过期。队列不允许存储空元素。

二、DelayQueue 的使用场景

  DelayQueue 被用于需要延迟处理任务的场景,例如,网民在网上商城下单后,如果超时未支付,订单会被后台系统关闭。这种需要延时处理的场景就可以采用 DelayQueue 实现。

三、原理解析(源码)
3.1 Class 定义
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
// ...
}

  DelayQueue 类继承了AbstractQueue,并实现了 BlockingQueue 接口,DelayQueue 的泛型参数(即队列中的元素)要实现 Delayed 接口。Delayed 接口定义如下。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

  Delayed 接口继承了 Comparable 接口,Comparable 接口定义如下。

public interface Comparable<T> {
	public int compareTo(T o);
}

  所以,延迟队列中的元素要实现 getDelay(TimeUnit unit) 和 compareTo(T o) 两个方法。

  • compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
  • getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。
3.2 延迟队列的属性

  DelayQueue 中的重要属性如下所示。

// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();

  DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。什么是领导者-追随者模式,具体见本文第四节示例。

3.3 DelayQueue 的主要方法
3.3.1 offer 添加元素
public boolean offer(E e) {
        // 获取全局独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 向优先队列中插入元素
            q.offer(e);
            // 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程 
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            // 释放全局独占锁
            lock.unlock();
        }
    }
  • leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
  • 如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
  • 如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。

  DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。

3.3.2 take 取出元素

  take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。

public E take() throws InterruptedException {
        // 获取全局独占锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 获取队头元素,peek 方法不会删除元素
                E first = q.peek();
                if (first == null)
                    // 若队头为空,则阻塞当前线程
                    available.await();
                else {
                    // 否则获取队头元素的超时时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 已超时,直接出队
                    if (delay <= 0)
                        return q.poll();
                    // 释放 first 的引用,避免内存泄漏
                    first = null; // don't retain ref while waiting
                    // leader != null 表明有其他线程在操作,阻塞当前线程
                    if (leader != null)
                        available.await();
                    else {
                        // leader 指向当前线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 超时阻塞
                            available.awaitNanos(delay);
                        } finally {
                            // 释放 leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列
            if (leader == null && q.peek() != null)
                available.signal();
            // 释放全局独占锁    
            lock.unlock();
        }
    }
3.3.3 poll 取出元素

  取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
四、什么是 Leader-Follower 模式?

  DelayQueue 采用了 Leader-Follower 模式,那什么是 Leader-Follower 模式呢?举个 “饭店运作模式” 的例子来对照理解。

4.1 单 Reactor 多线程模式(Reactor 模式称为反应器模式或应答者模式)

  饭店的员工一般都是分角色的,比如接待员、服务员、厨师等等。
  假如有一个叫做 A 的人,固定他作为饭店接待员,来客人了就分给客人一个座位号,然后交给其他服务员,比如 B 进行后续处理。B 会根据座位号为客人引路,为客人点菜等等。如果把 A、B 比作两个线程,客人比作任务,任务由 A 处理,到交接给 B 处理,有一次线程上下文切换。

4.2 Leader-Follower 模式

  这次饭店不分角色了,每个人都是接待员和服务员,统称为员工。
  每次只能有一个员工在门口等待,比如 A 先在门口等待,其他员工在屋里歇着。来客人了的话,A 会叫一个其他员工,比如 B 来门口接替自己。然后 A 开始为客人服务,比如分配座位号,引路,点菜等全流程服务。拿线程来说的话,就是接受任务,处理任务都是由线程 A 负责,没有线程上下文切换。

4.3 DelayQueue 的 Leader-Follower 模式

  这次饭店也不分角色,都是员工,但是改变了经营策略,每个客人必须预约吃饭时间,预约采用 APP 预约。因为加入了延时,逻辑变得复杂了一些。
  每次还是只能有一个员工在门口等待,比如 A 先在门口等待,A 看了眼预约登记表,发现离预约最早到店的时间还有 30 分钟,A 就什么都不干了,先休息 30 分钟。
  其他员工还是先在屋里歇着,但是因为采用 APP 预约,客人约几点都有可能,如果此时有客人约的是 10 分钟后到店,因为 A 要 30 分钟后才能醒来干活,所以如果这位客人来了,门口就没有人接待了。
  对于这个问题,饭店的软件系统在监听到最早到店时间变了的话,会再叫一个员工来门口等待,此员工可能是新员工 B,也可能是叫醒了之前在门口休息的员工 A。我们叫这位新员工 X。 如果新员工 X 发现最早到店时间是现在,或者客人已经来了,就会叫一个员工 C 来门口接替自己,并立即开始为客人提供全流程服务。如果新员工 X 发现最早到店时间是 10 分钟后,新员工 X 就像 A 之前一样,什么都不干了,先休息 10 分钟。
  如果最早到店时间没有变化,还是 30 分钟后,软件系统不会叫人,其他员工看到 A 在门口等待,自己可以安心的在屋里歇着,等着 A 叫人替换他。
  员工 A 在 30 分钟后醒来,客人也到了,A 会叫一个同事(比如 B)接替自己,而 A 为客人提供全流程服务。

五、图解 DelayQueue 的生产/消费过程

  DelayQueue 是 Leader-Follower 模式的变种,以下通过队列及消费者线程状态变化大致说明一下 DelayQueue 的运行过程。

初始状态
在这里插入图片描述
  因为队列是没有边界的,向队列中添加元素的线程不会阻塞,添加操作相对简单,所以此图不考虑向队列添加元素的生产者线程。假设现在共有三个消费者线程。
  队列中的元素按到期时间排序,队列头部的元素 2s 以后到期。消费者线程1查看了头部元素以后,发现还需要 2s 才到期,于是它进入等待状态,2s 以后醒来,等待头部元素到期的线程称为 Leader 线程。
  消费者线程 2 与消费者线程 3 处于待命状态,它们不等待队列中的非头部元素。当消费者线程1拿到对象 5 以后,会向它们发送 signal。这个时候两个线程中的一个会结束待命状态而进入等待状态。

2S 以后
在这里插入图片描述
  消费者线程1已经拿到了对象5,从等待状态进入处理状态,处理它取到的对象5,同时向消费者线程 2 与消费者线程 3 发送 signal。
  消费者线程 2 与消费者线程 3 会争抢领导权,这里是消费者线程 2 进入等待状态,成为 Leader 线程,等待 2s 以后对象 4 到期。而消费者线程 3 则继续处于待命状态。
  此时队列中加入了一个新元素对象6,它 10s 后到期,排在队尾。

又 2S 以后
在这里插入图片描述
  先看线程1,如果它已经结束了对象 5 的处理,则进入待命状态。如果还没有结束,则它继续处理对象 5。
  消费线程2取到对象 4 以后,也进入处理状态,同时给处于待命状态的消费线程3发送信号,消费线程3进入等待状态,成为新的 Leader。现在头部元素是新插入的对象 7,因为它 1s 以后就过期,要早于其它所有元素,所以排到了队列头部。

又 1S 后

  一种不好的结果:
在这里插入图片描述
  消费线程3一定正在处理对象 7。消费线程1与消费线程2还没有处理完它们各自取得的对象,无法进入待命状态,也更加进入不了等待状态。此时对象 3 马上要到期,那么如果它到期时没有消费者线程空下来,则它的处理一定会延期。
  可以想见,如果元素进入队列的速度很快,元素之间的到期时间相对集中,而处理每个到期元素的速度又比较慢的话,则队列会越来越大,队列后边的元素延期处理的时间会越来越长。

  另外一种好的结果:
在这里插入图片描述
  消费线程1与消费线程2很快完成对取出对象的处理,及时返回重新等待队列中的到期元素。一个处于等待状态(Leader),对象 3 一到期就立刻处理。另一个则处于待命状态。这样,每一个对象都能在到期时被及时处理,不会发生明显的延期。
  所以,消费者线程的数量要够,处理任务的速度要快。否则,队列中的到期元素无法被及时取出并处理,造成任务延期、队列元素堆积等情况。
  如果消费者线程数过少,则来不及处理到期的任务。如果消费者线程数过多,在线程调度、同步上花更多的时间,无益改善性能。

文章参考
  • 9
    点赞
  • 44
    收藏
    觉得还不错? 一键收藏
  • 1
    评论
你可以通过以下步骤创建 Java 延迟队列: 1. 导入延迟队列所需的类: ```java import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; ``` 2. 创建 Delayed 接口的实现类,该类的对象将被存储在延迟队列中。例如: ```java class DelayedTask implements Delayed { private String name; private long executeTime; public DelayedTask(String name, long delay) { this.name = name; this.executeTime = System.currentTimeMillis() + delay; } @Override public long getDelay(TimeUnit unit) { long diff = executeTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return Long.compare(diff, 0); } @Override public String toString() { return "DelayedTask{" + "name='" + name + '\'' + ", executeTime=" + executeTime + '}'; } } ``` 3. 创建 DelayQueue 实例,将 Delayed 对象添加到队列中: ```java DelayQueue<DelayedTask> delayQueue = new DelayQueue<>(); delayQueue.put(new DelayedTask("task1", 1000)); // 添加延迟 1s 的任务 delayQueue.put(new DelayedTask("task2", 2000)); // 添加延迟 2s 的任务 ``` 4. 使用 take() 方法获取延迟时间到的 Delayed 对象: ```java while (true) { DelayedTask task = delayQueue.take(); System.out.println(task); } ``` 以上就是创建 Java 延迟队列的基本步骤。你可以根据自己的需求进行修改和扩展。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值