set input delay,DelayQueue1.8源碼

 2023-10-18 阅读 24 评论 0

摘要:DelayQueue 內部通過組合PriorityQueue 來實現存儲和維護元素順序的,通過compareTo方法,排序存儲,小的在前面。每次插入元素,都可能移動元素 DelayQueue 存儲元素必須實現Delayed 接口,通過實現Delayed 接口,可以獲取到元素延遲時間

DelayQueue 內部通過組合PriorityQueue 來實現存儲和維護元素順序的,通過compareTo方法,排序存儲,小的在前面。每次插入元素,都可能移動元素

DelayQueue 存儲元素必須實現Delayed 接口,通過實現Delayed 接口,可以獲取到元素延遲時間,以及可以比較元素大小(Delayed 繼承Comparable)
DelayQueue 通過一個可重入鎖來控制元素的入隊出隊行為
DelayQueue 中leader 標識 用于減少線程的競爭,表示當前有其它線程正在獲取隊頭元素。
PriorityQueue 只是負責存儲數據以及維護元素的順序,對于延遲時間取數據則是在DelayQueue 中進行判斷控制的。
DelayQueue 沒有實現序列化接口

和時間輪算法區別

之所以要用到PriorityQueue,主要是需要排序。也許后插入的消息需要比隊列中的其他消息提前觸發,那么這個后插入的消息就需要最先被消費者獲取,這就需要排序功能。PriorityQueue內部使用最小堆來實現排序隊列。隊首的,最先被消費者拿到的就是最小的那個。使用最小堆讓隊列在數據量較大的時候比較有優勢。使用最小堆來實現優先級隊列主要是因為最小堆在插入和獲取時,時間復雜度相對都比較好,都是O(logN)

時間輪是一種非常驚艷的數據結構。其在Linux內核中使用廣泛,是Linux內核定時器的實現方法和基礎之一。按使用場景,大致可以分為兩種時間輪:原始時間輪和分層時間輪。分層時間輪是原始時間輪的升級版本,來應對時間“槽”數量比較大的情況,對內存和精度都有很高要求的情況。我們延遲任務的場景一般只需要用到原始時間輪就可以了。
原始時間輪:如下圖一個輪子,有8個“槽”,可以代表未來的一個時間。如果以秒為單位,中間的指針每隔一秒鐘轉動到新的“槽”上面,就好像手表一樣。如果當前指針指在1上面,我有一個任務需要4秒以后執行,那么這個執行的線程回調或者消息將會被放在5上。那如果需要在20秒之后執行怎么辦,由于這個環形結構槽數只到8,如果要20秒,指針需要多轉2圈。位置是在2圈之后的5上面(20 % 8 + 1)。這個圈數需要記錄在槽中的數據結構里面。這個數據結構最重要的是兩個指針,一個是觸發任務的函數指針,另外一個是觸發的總第幾圈數。時間輪可以用簡單的數組或者是環形鏈表來實現。

在這里插入圖片描述

相比DelayQueue的數據結構,時間輪在算法復雜度上有一定優勢。DelayQueue由于涉及到排序,需要調堆,插入和移除的復雜度是O(lgn),而時間輪在插入和移除的復雜度都是O(1)。

使用

消息消費時間不一定是制定的時間,例如下面

@Data
public class Message implements Delayed {private int id;private String body; // 消息內容private long excuteTime;// 延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。public Message(int id, String body, long delayTime) {this.id = id;this.body = body;this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed o) {Message msg = (Message) o;return this.id > msg.id ? 1: this.id < msg.id ? -1 : 0;}public static void main(String[] args) throws InterruptedException {// 創建延時隊列DelayQueue<Message> queue = new DelayQueue<>();// 添加延時消息,m1 延時3sMessage m1 = new Message(3, "world", 2000);// 添加延時消息,m2 延時10sMessage m2 = new Message(2, "hello", 4000);//將延時消息放到延時隊列中queue.offer(m2);queue.offer(m1);System.out.println("消息入隊 " + new Date());System.out.println("消費消息:" + queue.take() + " " + new Date());System.out.println("消費消息:" + queue.take() + " " + new Date());}}

消息入隊 Tue Apr 30 16:07:52 CST 2019
消費消息:Message(id=2, body=hello, excuteTime=5580123695174) Tue Apr 30 16:07:56 CST 2019
消費消息:Message(id=3, body=world, excuteTime=5578123690265) Tue Apr 30 16:07:56 CST 2019

消息實現Delay接口

public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit);
}public interface Comparable<T> {public int compareTo(T o);
}

DelayQueue構造器

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();/*** Thread designated to wait for the element at the head of* the queue.  This variant of the Leader-Follower pattern* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to* minimize unnecessary timed waiting.  When a thread becomes* the leader, it waits only for the next delay to elapse, but* other threads await indefinitely.  The leader thread must* signal some other thread before returning from take() or* poll(...), unless some other thread becomes leader in the* interim.  Whenever the head of the queue is replaced with* an element with an earlier expiration time, the leader* field is invalidated by being reset to null, and some* waiting thread, but not necessarily the current leader, is* signalled.  So waiting threads must be prepared to acquire* and lose leadership while waiting.*/private Thread leader = null;/*** Condition signalled when a newer element becomes available* at the head of the queue or a new thread may need to* become leader.*/private final Condition available = lock.newCondition();/*** Creates a new {@code DelayQueue} that is initially empty.*/public DelayQueue() {}

使用PriorityQueue存儲,默認容量11

    private static final int DEFAULT_INITIAL_CAPACITY = 11;public PriorityQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}

存儲元素

    /*** Inserts the specified element into this delay queue.** @param e the element to add* @return {@code true}* @throws NullPointerException if the specified element is null*/public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}

PriorityQueue添加元素,不能為null

    /*** Inserts the specified element into this priority queue.** @return {@code true} (as specified by {@link Queue#offer})* @throws ClassCastException if the specified element cannot be*         compared with elements currently in this priority queue*         according to the priority queue's ordering* @throws NullPointerException if the specified element is null*/public boolean offer(E e) {if (e == null)throw new NullPointerException();modCount++;int i = size;if (i >= queue.length)grow(i + 1);size = i + 1;if (i == 0)queue[0] = e;elsesiftUp(i, e);return true;}

第一個元素,queue[0] = e;結束
第二個元素,siftUp(i, e);

/*** Inserts item x at position k, maintaining heap invariant by* promoting x up the tree until it is greater than or equal to* its parent, or is the root.** To simplify and speed up coercions and comparisons. the* Comparable and Comparator versions are separated into different* methods that are otherwise identical. (Similarly for siftDown.)** @param k the position to fill* @param x the item to insert*/private void siftUp(int k, E x) {if (comparator != null)siftUpUsingComparator(k, x);elsesiftUpComparable(k, x);}

因為消息實現Comparable接口,comparator =null

@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;//二叉樹,孩子索引求父親索引Object e = queue[parent];if (key.compareTo((E) e) >= 0)break;queue[k] = e;k = parent;}queue[k] = key;}

因為第二個元素排序大,所以break;退出循環,直接后面添加元素
如果第二個元素排序小,就會和第一個互換位置

PriorityQueue獲取頭元素,如果沒有,返回null

    @SuppressWarnings("unchecked")public E peek() {return (size == 0) ? null : (E) queue[0];}

取出元素

/*** Retrieves and removes the head of this queue, waiting if necessary* until an element with an expired delay is available on this queue.** @return the head of this queue* @throws InterruptedException {@inheritDoc}*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();//如果頭結點為null,掛起線程else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

PriorityQueue-poll獲取并刪除第一個元素

@SuppressWarnings("unchecked")public E poll() {if (size == 0)return null;int s = --size;modCount++;E result = (E) queue[0];E x = (E) queue[s];queue[s] = null;if (s != 0)siftDown(0, x);return result;}

如果原來有兩個以上元素,需要移動元素,把第二個元素移到第一個

/*** Inserts item x at position k, maintaining heap invariant by* demoting x down the tree repeatedly until it is less than or* equal to its children or is a leaf.** @param k the position to fill* @param x the item to insert*/private void siftDown(int k, E x) {if (comparator != null)siftDownUsingComparator(k, x);elsesiftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>)x;int half = size >>> 1;        // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = queue[child];int right = child + 1;if (right < size &&((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)c = queue[child = right];if (key.compareTo((E) c) <= 0)break;queue[k] = c;k = child;}queue[k] = key;}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/1/148894.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息