延迟队列DelayQueue原理

前言

什么是DelayQueue(延时队列)

DelayQueue 是一个通过PriorityBlockingQueue实现延迟获取元素的无界队列无界阻塞队列,其中添加进该队列的元素必须实现Delayed接口(指定延迟时间),而且只有在延迟期满后才能从中提取元素。

什么是PriorityBlockingQueue(优先队列)

PriorityBlockingQueue是一个支持优先级的无界阻塞队列,队列的元素默认情况下元素采用自然顺序升序排列,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。

DelayQueue使用场景

DelayQueue可以运用在以下应用场景:

  1. 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  2. 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

DelayQueue原理

DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。

Leader/Followers模式:

  1. 有若干个线程(一般组成线程池)用来处理大量的事件
  2. 有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠。
  3. 假如有事件需要处理,领导者会从追随者中指定一个新的领导者,自己去处理事件。
  4. 唤醒的追随者作为新的领导者等待事件的发生。
  5. 处理事件的线程处理完毕以后,就会成为追随者的一员,直到被唤醒成为领导者。
  6. 假如需要处理的事件太多,而线程数量不够(能够动态创建线程处理另当别论),则有的事件可能会得不到处理。

所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。

DelayQueue源码解析

DelayQueue属性

//可重入同步锁
private final transient ReentrantLock lock = new ReentrantLock();

//DelayQueue的实现依赖于PriorityQueue(优先队列)
private final PriorityQueue<E> q = new PriorityQueue<E>();

//第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个leader不为null,那么就直接wait
//主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;

//条件队列,用于wait线程
private final Condition available = lock.newCondition();

DelayQueue构造方法

    //从上面属性就可以看出,DelayQueue采用了饿汉模式,调用构造方法即创建了队列实例
    public DelayQueue() {}

    /**
     * 创建一个DelayQueue,最初包含给定的Collection实例集合。
     * @param c 最初包含的元素集合
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

DelayQueue主要方法

offer添加元素
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //调用优先队列
        q.offer(e);
        //检验元素是否为队首,是则设置 leader 为null, 并唤醒一个消费线程  
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
take获取元素
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            //从优先队列中获取第一个元素,peek方法不会删除元素
            E first = q.peek();
            //如果获取不到数据,则调用available.await()进入阻塞状态
            if (first == null)
                available.await();
            else {
                //获取当前延时对象是否到期
                long delay = first.getDelay(NANOSECONDS);
                //到期那么返回这个延时对象
                if (delay <= 0)
                    return q.poll();
                first = null; // 
                //leader不为空,表明已经有其他线程在等待这个延时对象了
                //为什么不available.awaitNanos(delay)呢?这将会导致大量的线程在同一时间点被唤醒,然后去竞争
                //这个到期的延时任务,影响性能,还不如直接将他们无时间限制的wait,leader线程或者其他新进来的线程获取到延时对象后,去唤醒
                //让他们去竞争下一个延时对象
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //指定纳秒级别线程阻塞时间,当前wait住的线程被唤醒后有可能与其他线程竞争失败,就会进入了同步队列阻塞,那个抢到锁的线程就会取走这个延时对象
                        available.awaitNanos(delay);
                    } finally {
                        //leader线程被唤醒并获取到锁之后会将leader设置为空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //leader为空并且队列不为空,那么唤醒正在等待的线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();  //释放锁
    }
}

从优先队列中取值,如果取到的延时节点已经已经到期,那么直接返回,如果还没有到期并且已经有其他线程在执行delay时间等待了(也就是leader线程),那么挂起自己(避免延时 相同时间造成大量线程同时唤醒), leader线程在指定delay时间后主动唤醒,然后取竞争锁,如果竞争成功,那么很大概率可以获取到延时节点,如果竞争失败,将被阻塞。

remove删除元素
public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.remove(o);
    } finally {
        lock.unlock();
    }
}

Delayed接口

使用DelayQueue的话,放入该队列的对象必须实现Delayed接口,实现的接口中有两个参数:延迟时间单位,优先级规则,take方法会根据规则按照优先级执行

Delayed接口源码:

public interface Delayed extends Comparable<Delayed> {

    /**
     * 返回与此对象关联的剩余延迟(给定的时间单位)。
     * @param unit 时间单位
     * @返回剩余延迟;零值或负值表示 延迟已过期
     */
    long getDelay(TimeUnit unit);
}

因为Delayed继承了Comparable,所以还需要实现compareTo方法,具体实现如下:

class MyDelay implements Delayed {


    long delayTime; // 延迟时间
    long expire; // 过期时间

    public MyDelay(long delayTime, Thread t) {
        this.delayTime = delayTime;
        // 过期时间 = 当前时间 + 延迟时间
        this.expire = System.currentTimeMillis() + delayTime;
    }

    /**
     * 剩余时间 = 到期时间 - 当前时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 优先级规则:两个任务比较,时间短的优先执行
     */
    @Override
    public int compareTo(Delayed o) {
        long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return (int) f;
    }
}

使用示例

实现Delayed接口:

class MyDelay<T> implements Delayed {


    long delayTime; // 延迟时间
    long expire; // 过期时间
    T data;

    public MyDelay(long delayTime, T t) {
        this.delayTime = delayTime;
        // 过期时间 = 当前时间 + 延迟时间
        this.expire = System.currentTimeMillis() + delayTime;
        data = t;
    }

    /**
     * 剩余时间 = 到期时间 - 当前时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 优先级规则:两个任务比较,时间短的优先执行
     */
    @Override
    public int compareTo(Delayed o) {
        long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return (int) f;
    }


    @Override
    public String toString() {
        return "delayTime=" + delayTime +
                ", expire=" + expire +
                ", data=" + data;
    }
}

测试用例如下:

public class DelayQueueDemo {


    static BlockingQueue<Delayed> queue = new DelayQueue();

    public static void main(String[] args) throws InterruptedException {
        queue.add(new MyDelay(8, "第一次添加任务"));
        queue.add(new MyDelay(3, "第二次添加任务"));
        queue.add(new MyDelay(5, "第三次添加任务"));

        while (!queue.isEmpty()) {
            Delayed delayed = queue.take();
            System.out.println(delayed);
        }
    }

}

输出如下:

delayTime=3, expire=1625902338874, data=第二次添加任务
delayTime=5, expire=1625902338876, data=第三次添加任务
delayTime=8, expire=1625902338879, data=第一次添加任务

总结

DelayQueue其实采用了装饰器模式,在对PriorityQueue进行包装下增加了延时时间获取元素的功能,其主要特点归纳如下:

  1. DelayQueue是一个无界阻塞队列,队列内部使用PriorityQueue来实现。

  2. 进入队列的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从中提取元素;

  3. 该队列头部是延迟期满后保存时间最长的Delayed元素;

  4. 如果没有延迟未过期元素,且队列没有头部,并且poll将返回null;

  5. 当一个元素的getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于0的值时,表示该元素已过期;

  6. 无法使用poll或take移除未到期的元素,也不会将这些元素作为正常元素对待;例如:size方法返回到期和未到期元素的计数之和。

  7. 此队列不允许使用null元素。

  • 24
    点赞
  • 69
    收藏
    觉得还不错? 一键收藏
  • 14
    评论
DelayQueue是一个通过PriorityBlockingQueue实现延迟获取元素的无界阻塞队列。在DelayQueue中添加的元素必须实现Delayed接口,并且只有在延迟期满后才能从队列中提取元素。\[2\]DelayQueue常用于需要延迟处理任务的场景,比如在网上商城下单后,如果超时未支付,订单会被后台系统关闭,这种需要延时处理的场景可以使用DelayQueue来实现。\[3\]DelayQueue的实现原理是基于PriorityBlockingQueue,它是一个无界阻塞队列,可以按照元素的优先级进行排序。\[2\]DelayQueue的源码定义如下:public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { // ... }\[3\] #### 引用[.reference_title] - *1* *2* [延迟队列DelayQueue原理](https://blog.csdn.net/c15158032319/article/details/118636233)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v91^control_2,239^v3^insert_chatgpt"}} ] [.reference_item] - *3* [Java 延迟队列 DelayQueue原理](https://blog.csdn.net/piaoranyuji/article/details/124042408)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v91^control_2,239^v3^insert_chatgpt"}} ] [.reference_item] [ .reference_list ]

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 14
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值